From 0a835436acdf6c4f010c0eb9f66b849d831c38e8 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Mon, 26 Apr 2021 19:00:16 +0100 Subject: [PATCH] feat: use bitmasks within MUB (#1274) (#1289) * feat: use bitmasks within MUB (#1274) * chore: review feedback Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 1 + data_types/src/partition_metadata.rs | 135 +++-- internal_types/src/entry.rs | 33 +- internal_types/src/schema.rs | 6 +- internal_types/src/schema/builder.rs | 11 +- mutable_buffer/Cargo.toml | 1 + mutable_buffer/src/bitset.rs | 264 +++++++++ mutable_buffer/src/chunk/snapshot.rs | 16 +- mutable_buffer/src/column.rs | 667 ++++++++++++----------- mutable_buffer/src/lib.rs | 1 + mutable_buffer/src/table.rs | 331 ++++------- server/src/db.rs | 10 +- server/src/db/system_tables.rs | 4 +- server/src/query_tests/sql.rs | 4 +- tests/end_to_end_cases/management_api.rs | 6 +- tests/end_to_end_cases/management_cli.rs | 2 +- 16 files changed, 866 insertions(+), 626 deletions(-) create mode 100644 mutable_buffer/src/bitset.rs diff --git a/Cargo.lock b/Cargo.lock index 085ab2096b..be8e7a597c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1889,6 +1889,7 @@ dependencies = [ "internal_types", "observability_deps", "parking_lot", + "rand 0.8.3", "snafu", "test_helpers", "tokio", diff --git a/data_types/src/partition_metadata.rs b/data_types/src/partition_metadata.rs index 8cce3d9f9b..7a5eb16d7c 100644 --- a/data_types/src/partition_metadata.rs +++ b/data_types/src/partition_metadata.rs @@ -1,10 +1,10 @@ //! This module contains structs that describe the metadata for a partition //! including schema, summary statistics, and file locations in storage. -use std::fmt::{Debug, Display}; use std::mem; use serde::{Deserialize, Serialize}; +use std::borrow::Borrow; /// Describes the schema, summary statistics for each column in each table and /// the location of the partition in storage. @@ -222,8 +222,8 @@ impl Statistics { } /// Summary statistics for a column. -#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)] -pub struct StatValues { +#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Default)] +pub struct StatValues { pub min: T, pub max: T, /// number of non-nil values in this column @@ -232,51 +232,38 @@ pub struct StatValues { impl StatValues where - T: PartialEq + PartialOrd + Debug + Display + Clone, + T: Default + Clone, { - pub fn new(starting_value: T) -> Self { + pub fn new() -> Self { + Self::default() + } + + pub fn new_with_value(starting_value: T) -> Self { Self { min: starting_value.clone(), max: starting_value, count: 1, } } - - /// updates the statistics keeping the min, max and incrementing count. - pub fn update(&mut self, other: T) { - self.count += 1; - - let set_min = self.min > other; - let set_max = self.max < other; - - match (set_min, set_max) { - (true, true) => { - self.min = other.clone(); - self.max = other; - } - (true, false) => { - self.min = other; - } - (false, true) => { - self.max = other; - } - (false, false) => (), - } - } } -impl StatValues { - /// Function for string stats to avoid allocating if we're not updating min - /// or max - pub fn update_string(stats: &mut Self, other: &str) { - stats.count += 1; +impl StatValues { + /// updates the statistics keeping the min, max and incrementing count. + /// + /// The type plumbing exists to allow calling with &str on a StatValues + pub fn update(&mut self, other: &U) + where + T: Borrow, + U: ToOwned + PartialOrd, + { + self.count += 1; - if stats.min.as_str() > other { - stats.min = other.to_string(); + if self.count == 1 || self.min.borrow() > other { + self.min = other.to_owned(); } - if stats.max.as_str() < other { - stats.max = other.to_string(); + if self.count == 1 || self.max.borrow() < other { + self.max = other.to_owned(); } } } @@ -287,45 +274,73 @@ mod tests { #[test] fn statistics_update() { - let mut stat = StatValues::new(23); + let mut stat = StatValues::new_with_value(23); assert_eq!(stat.min, 23); assert_eq!(stat.max, 23); assert_eq!(stat.count, 1); - stat.update(55); + stat.update(&55); assert_eq!(stat.min, 23); assert_eq!(stat.max, 55); assert_eq!(stat.count, 2); - stat.update(6); + stat.update(&6); assert_eq!(stat.min, 6); assert_eq!(stat.max, 55); assert_eq!(stat.count, 3); - stat.update(30); + stat.update(&30); assert_eq!(stat.min, 6); assert_eq!(stat.max, 55); assert_eq!(stat.count, 4); } + #[test] + fn statistics_default() { + let mut stat = StatValues::new(); + assert_eq!(stat.min, 0); + assert_eq!(stat.max, 0); + assert_eq!(stat.count, 0); + + stat.update(&55); + assert_eq!(stat.min, 55); + assert_eq!(stat.max, 55); + assert_eq!(stat.count, 1); + + let mut stat = StatValues::new(); + assert_eq!(&stat.min, ""); + assert_eq!(&stat.max, ""); + assert_eq!(stat.count, 0); + + stat.update("cupcakes"); + assert_eq!(&stat.min, "cupcakes"); + assert_eq!(&stat.max, "cupcakes"); + assert_eq!(stat.count, 1); + + stat.update("woo"); + assert_eq!(&stat.min, "cupcakes"); + assert_eq!(&stat.max, "woo"); + assert_eq!(stat.count, 2); + } + #[test] fn update_string() { - let mut stat = StatValues::new("bbb".to_string()); + let mut stat = StatValues::new_with_value("bbb".to_string()); assert_eq!(stat.min, "bbb".to_string()); assert_eq!(stat.max, "bbb".to_string()); assert_eq!(stat.count, 1); - StatValues::update_string(&mut stat, "aaa"); + stat.update("aaa"); assert_eq!(stat.min, "aaa".to_string()); assert_eq!(stat.max, "bbb".to_string()); assert_eq!(stat.count, 2); - StatValues::update_string(&mut stat, "z"); + stat.update("z"); assert_eq!(stat.min, "aaa".to_string()); assert_eq!(stat.max, "z".to_string()); assert_eq!(stat.count, 3); - StatValues::update_string(&mut stat, "p"); + stat.update("p"); assert_eq!(stat.min, "aaa".to_string()); assert_eq!(stat.max, "z".to_string()); assert_eq!(stat.count, 4); @@ -333,22 +348,22 @@ mod tests { #[test] fn table_update_from() { - let mut string_stats = StatValues::new("foo".to_string()); - string_stats.update("bar".to_string()); + let mut string_stats = StatValues::new_with_value("foo".to_string()); + string_stats.update("bar"); let string_col = ColumnSummary { name: "string".to_string(), stats: Statistics::String(string_stats), }; - let mut int_stats = StatValues::new(1); - int_stats.update(5); + let mut int_stats = StatValues::new_with_value(1); + int_stats.update(&5); let int_col = ColumnSummary { name: "int".to_string(), stats: Statistics::I64(int_stats), }; - let mut float_stats = StatValues::new(9.1); - float_stats.update(1.3); + let mut float_stats = StatValues::new_with_value(9.1); + float_stats.update(&1.3); let float_col = ColumnSummary { name: "float".to_string(), stats: Statistics::F64(float_stats), @@ -359,15 +374,15 @@ mod tests { columns: vec![string_col, int_col, float_col], }; - let mut string_stats = StatValues::new("aaa".to_string()); - string_stats.update("zzz".to_string()); + let mut string_stats = StatValues::new_with_value("aaa".to_string()); + string_stats.update("zzz"); let string_col = ColumnSummary { name: "string".to_string(), stats: Statistics::String(string_stats), }; - let mut int_stats = StatValues::new(3); - int_stats.update(9); + let mut int_stats = StatValues::new_with_value(3); + int_stats.update(&9); let int_col = ColumnSummary { name: "int".to_string(), stats: Statistics::I64(int_stats), @@ -446,15 +461,15 @@ mod tests { #[test] fn from_table_summaries() { - let mut string_stats = StatValues::new("foo".to_string()); - string_stats.update("bar".to_string()); + let mut string_stats = StatValues::new_with_value("foo".to_string()); + string_stats.update("bar"); let string_col = ColumnSummary { name: "string".to_string(), stats: Statistics::String(string_stats), }; - let mut int_stats = StatValues::new(1); - int_stats.update(5); + let mut int_stats = StatValues::new_with_value(1); + int_stats.update(&5); let int_col = ColumnSummary { name: "int".to_string(), stats: Statistics::I64(int_stats), @@ -467,7 +482,7 @@ mod tests { let int_col = ColumnSummary { name: "int".to_string(), - stats: Statistics::I64(StatValues::new(10)), + stats: Statistics::I64(StatValues::new_with_value(10)), }; let table_b = TableSummary { name: "b".to_string(), @@ -481,7 +496,7 @@ mod tests { let int_col = ColumnSummary { name: "int".to_string(), - stats: Statistics::I64(StatValues::new(203)), + stats: Statistics::I64(StatValues::new_with_value(203)), }; let table_b_2 = TableSummary { name: "b".to_string(), diff --git a/internal_types/src/entry.rs b/internal_types/src/entry.rs index 85a8034ea4..3e43c04e21 100644 --- a/internal_types/src/entry.rs +++ b/internal_types/src/entry.rs @@ -1,7 +1,7 @@ //! This module contains helper code for building `Entry` and `SequencedEntry` //! from line protocol and the `DatabaseRules` configuration. -use crate::schema::TIME_COLUMN_NAME; +use crate::schema::{InfluxColumnType, InfluxFieldType, TIME_COLUMN_NAME}; use data_types::database_rules::{Error as DataError, Partitioner, ShardId, Sharder, WriterId}; use generated_types::entry as entry_fb; use influxdb_line_protocol::{FieldValue, ParsedLine}; @@ -445,6 +445,37 @@ impl<'a> Column<'a> { .expect("name must be present in flatbuffers Column") } + pub fn inner(&self) -> &entry_fb::Column<'a> { + &self.fb + } + + pub fn influx_type(&self) -> InfluxColumnType { + match (self.fb.values_type(), self.fb.logical_column_type()) { + (entry_fb::ColumnValues::BoolValues, entry_fb::LogicalColumnType::Field) => { + InfluxColumnType::Field(InfluxFieldType::Boolean) + } + (entry_fb::ColumnValues::U64Values, entry_fb::LogicalColumnType::Field) => { + InfluxColumnType::Field(InfluxFieldType::UInteger) + } + (entry_fb::ColumnValues::F64Values, entry_fb::LogicalColumnType::Field) => { + InfluxColumnType::Field(InfluxFieldType::Float) + } + (entry_fb::ColumnValues::I64Values, entry_fb::LogicalColumnType::Field) => { + InfluxColumnType::Field(InfluxFieldType::Integer) + } + (entry_fb::ColumnValues::StringValues, entry_fb::LogicalColumnType::Tag) => { + InfluxColumnType::Tag + } + (entry_fb::ColumnValues::StringValues, entry_fb::LogicalColumnType::Field) => { + InfluxColumnType::Field(InfluxFieldType::String) + } + (entry_fb::ColumnValues::I64Values, entry_fb::LogicalColumnType::Time) => { + InfluxColumnType::Timestamp + } + _ => unreachable!(), + } + } + pub fn logical_type(&self) -> entry_fb::LogicalColumnType { self.fb.logical_column_type() } diff --git a/internal_types/src/schema.rs b/internal_types/src/schema.rs index 48c6059129..086a6872b1 100644 --- a/internal_types/src/schema.rs +++ b/internal_types/src/schema.rs @@ -617,10 +617,10 @@ impl From<&InfluxColumnType> for &'static str { } } -impl ToString for InfluxColumnType { - fn to_string(&self) -> String { +impl std::fmt::Display for InfluxColumnType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let s: &str = self.into(); - s.into() + write!(f, "{}", s) } } diff --git a/internal_types/src/schema/builder.rs b/internal_types/src/schema/builder.rs index b6e1bde783..d1e0f80c8c 100644 --- a/internal_types/src/schema/builder.rs +++ b/internal_types/src/schema/builder.rs @@ -84,7 +84,7 @@ impl SchemaBuilder { ) } - /// Add a new field column with the specified InfluxDB data model type + /// Add a new field column with the specified InfluxDB data model type pub fn influx_field(self, column_name: &str, influxdb_field_type: InfluxFieldType) -> Self { let arrow_type: ArrowDataType = influxdb_field_type.into(); self.add_column( @@ -95,6 +95,15 @@ impl SchemaBuilder { ) } + /// Add a new field column with the specified InfluxDB data model type + pub fn influx_column(self, column_name: &str, column_type: InfluxColumnType) -> Self { + match column_type { + InfluxColumnType::Tag => self.tag(column_name), + InfluxColumnType::Field(field) => self.field(column_name, field.into()), + InfluxColumnType::Timestamp => self.timestamp(), + } + } + /// Add a new nullable field column with the specified Arrow datatype. pub fn field(self, column_name: &str, arrow_type: ArrowDataType) -> Self { let influxdb_column_type = arrow_type diff --git a/mutable_buffer/Cargo.toml b/mutable_buffer/Cargo.toml index 9f9deb88af..58cd714327 100644 --- a/mutable_buffer/Cargo.toml +++ b/mutable_buffer/Cargo.toml @@ -35,6 +35,7 @@ tracker = { path = "../tracker" } test_helpers = { path = "../test_helpers" } criterion = "0.3" flate2 = "1.0.20" +rand = "0.8.3" [features] default = [] diff --git a/mutable_buffer/src/bitset.rs b/mutable_buffer/src/bitset.rs new file mode 100644 index 0000000000..88b4cdf033 --- /dev/null +++ b/mutable_buffer/src/bitset.rs @@ -0,0 +1,264 @@ +use arrow_deps::arrow::buffer::Buffer; + +/// An arrow-compatible mutable bitset implementation +/// +/// Note: This currently operates on individual bytes at a time +/// it could be optimised to instead operate on usize blocks +#[derive(Debug)] +pub struct BitSet { + /// The underlying data + /// + /// Data is stored in the least significant bit of a byte first + buffer: Vec, + + /// The length of this mask in bits + len: usize, +} + +impl BitSet { + /// Creates a new BitSet + pub fn new() -> Self { + Self { + buffer: Default::default(), + len: 0, + } + } + + /// Appends `count` unset bits + pub fn append_unset(&mut self, count: usize) { + self.len += count; + let new_buf_len = (self.len + 7) >> 3; + self.buffer.resize(new_buf_len, 0); + } + + /// Appends `count` boolean values from the slice of packed bits + pub fn append_bits(&mut self, count: usize, to_set: &[u8]) { + let new_len = self.len + count; + let new_buf_len = (new_len + 7) >> 3; + self.buffer.reserve(new_buf_len - self.buffer.len()); + + let whole_bytes = count >> 3; + let overrun = count & 7; + + let skew = self.len & 7; + if skew == 0 { + self.buffer.extend_from_slice(&to_set[..whole_bytes]); + if overrun > 0 { + let masked = to_set[whole_bytes] & ((1 << overrun) - 1); + self.buffer.push(masked) + } + + self.len = new_len; + debug_assert_eq!(self.buffer.len(), new_buf_len); + return; + } + + for to_set_byte in &to_set[..whole_bytes] { + let low = *to_set_byte << skew; + let high = *to_set_byte >> (8 - skew); + + *self.buffer.last_mut().unwrap() |= low; + self.buffer.push(high); + } + + if overrun > 0 { + let masked = to_set[whole_bytes] & ((1 << overrun) - 1); + let low = masked << skew; + *self.buffer.last_mut().unwrap() |= low; + + if overrun > 8 - skew { + let high = masked >> (8 - skew); + self.buffer.push(high) + } + } + + self.len = new_len; + debug_assert_eq!(self.buffer.len(), new_buf_len); + } + + /// Sets a given bit + pub fn set(&mut self, idx: usize) { + let byte_idx = idx >> 3; + let bit_idx = idx & 7; + self.buffer[byte_idx] |= 1 << bit_idx; + } + + /// Returns if the given index is set + pub fn get(&self, idx: usize) -> bool { + let byte_idx = idx >> 3; + let bit_idx = idx & 7; + (self.buffer[byte_idx] >> bit_idx) & 1 != 0 + } + + /// Converts this BitSet to a buffer compatible with arrows boolean encoding + pub fn to_arrow(&self) -> Buffer { + Buffer::from(&self.buffer) + } + + /// Returns the number of values stored in the bitset + pub fn len(&self) -> usize { + self.len + } + + /// Returns the number of bytes used by this bitset + pub fn byte_len(&self) -> usize { + self.buffer.len() + } +} + +/// Returns an iterator over set bit positions in increasing order +pub fn iter_set_positions(bytes: &[u8]) -> impl Iterator + '_ { + let mut byte_idx = 0; + let mut in_progress = bytes.get(0).cloned().unwrap_or(0); + std::iter::from_fn(move || loop { + if in_progress != 0 { + let bit_pos = in_progress.trailing_zeros(); + in_progress ^= 1 << bit_pos; + return Some((byte_idx << 3) + (bit_pos as usize)); + } + byte_idx += 1; + in_progress = *bytes.get(byte_idx)?; + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_deps::arrow::array::BooleanBufferBuilder; + use rand::RngCore; + + /// Computes a compacted representation of a given bool array + fn compact_bools(bools: &[bool]) -> Vec { + bools + .chunks(8) + .map(|x| { + let mut collect = 0_u8; + for (idx, set) in x.iter().enumerate() { + if *set { + collect |= 1 << idx + } + } + collect + }) + .collect() + } + + fn iter_set_bools(bools: &[bool]) -> impl Iterator + '_ { + bools.iter().enumerate().filter_map(|(x, y)| y.then(|| x)) + } + + #[test] + fn test_compact_bools() { + let bools = &[ + false, false, true, true, false, false, true, false, true, false, + ]; + let collected = compact_bools(bools); + let indexes: Vec<_> = iter_set_bools(bools).collect(); + assert_eq!(collected.as_slice(), &[0b01001100, 0b00000001]); + assert_eq!(indexes.as_slice(), &[2, 3, 6, 8]) + } + + #[test] + fn test_bit_mask() { + let mut mask = BitSet::new(); + + mask.append_bits(8, &[0b11111111]); + let d1 = mask.buffer.clone(); + + mask.append_bits(3, &[0b01010010]); + let d2 = mask.buffer.clone(); + + mask.append_bits(5, &[0b00010100]); + let d3 = mask.buffer.clone(); + + mask.append_bits(2, &[0b11110010]); + let d4 = mask.buffer.clone(); + + mask.append_bits(15, &[0b11011010, 0b01010101]); + let d5 = mask.buffer.clone(); + + assert_eq!(d1.as_slice(), &[0b11111111]); + assert_eq!(d2.as_slice(), &[0b11111111, 0b00000010]); + assert_eq!(d3.as_slice(), &[0b11111111, 0b10100010]); + assert_eq!(d4.as_slice(), &[0b11111111, 0b10100010, 0b00000010]); + assert_eq!( + d5.as_slice(), + &[0b11111111, 0b10100010, 0b01101010, 0b01010111, 0b00000001] + ); + + assert!(mask.get(0)); + assert!(!mask.get(8)); + assert!(mask.get(9)); + assert!(mask.get(19)); + } + + #[test] + fn test_bit_mask_all_set() { + let mut mask = BitSet::new(); + let mut all_bools = vec![]; + let mut rng = rand::thread_rng(); + + for _ in 0..100 { + let mask_length = (rng.next_u32() % 50) as usize; + let bools: Vec<_> = std::iter::repeat(true).take(mask_length).collect(); + + let collected = compact_bools(&bools); + mask.append_bits(mask_length, &collected); + all_bools.extend_from_slice(&bools); + } + + let collected = compact_bools(&all_bools); + assert_eq!(mask.buffer, collected); + + let expected_indexes: Vec<_> = iter_set_bools(&all_bools).collect(); + let actual_indexes: Vec<_> = iter_set_positions(&mask.buffer).collect(); + assert_eq!(expected_indexes, actual_indexes); + } + + #[test] + fn test_bit_mask_fuzz() { + let mut mask = BitSet::new(); + let mut all_bools = vec![]; + let mut rng = rand::thread_rng(); + + for _ in 0..100 { + let mask_length = (rng.next_u32() % 50) as usize; + let bools: Vec<_> = std::iter::from_fn(|| Some(rng.next_u32() & 1 == 0)) + .take(mask_length) + .collect(); + + let collected = compact_bools(&bools); + mask.append_bits(mask_length, &collected); + all_bools.extend_from_slice(&bools); + } + + let collected = compact_bools(&all_bools); + assert_eq!(mask.buffer, collected); + + let expected_indexes: Vec<_> = iter_set_bools(&all_bools).collect(); + let actual_indexes: Vec<_> = iter_set_positions(&mask.buffer).collect(); + assert_eq!(expected_indexes, actual_indexes); + for index in actual_indexes { + assert!(mask.get(index)); + } + } + + #[test] + fn test_arrow_compat() { + let bools = &[ + false, false, true, true, false, false, true, false, true, false, false, true, + ]; + + let mut builder = BooleanBufferBuilder::new(bools.len()); + builder.append_slice(bools); + let buffer = builder.finish(); + + let collected = compact_bools(bools); + let mut mask = BitSet::new(); + mask.append_bits(bools.len(), &collected); + let mask_buffer = mask.to_arrow(); + + assert_eq!(collected.as_slice(), buffer.as_slice()); + assert_eq!(buffer.as_slice(), mask_buffer.as_slice()); + } +} diff --git a/mutable_buffer/src/chunk/snapshot.rs b/mutable_buffer/src/chunk/snapshot.rs index fc03846a42..d813fef1e3 100644 --- a/mutable_buffer/src/chunk/snapshot.rs +++ b/mutable_buffer/src/chunk/snapshot.rs @@ -8,6 +8,7 @@ use internal_types::selection::Selection; use snafu::{OptionExt, ResultExt, Snafu}; use super::Chunk; +use data_types::partition_metadata::Statistics; #[derive(Debug, Snafu)] pub enum Error { @@ -64,12 +65,15 @@ impl ChunkSnapshot { .lookup_value(TIME_COLUMN_NAME) .ok() .and_then(|column_id| { - table.column(column_id).ok().and_then(|column| { - // TimestampRange has an exclusive upper bound - column - .get_i64_stats() - .map(|x| TimestampRange::new(x.min, x.max + 1)) - }) + table + .column(column_id) + .ok() + .and_then(|column| match column.stats() { + Statistics::I64(stats) => { + Some(TimestampRange::new(stats.min, stats.max + 1)) + } + _ => None, + }) }); records.insert( diff --git a/mutable_buffer/src/column.rs b/mutable_buffer/src/column.rs index 17cdfb6577..110e4ce32e 100644 --- a/mutable_buffer/src/column.rs +++ b/mutable_buffer/src/column.rs @@ -1,340 +1,252 @@ -use snafu::Snafu; +use snafu::{ensure, Snafu}; use crate::dictionary::{Dictionary, DID}; -use data_types::partition_metadata::StatValues; -use generated_types::entry::LogicalColumnType; -use internal_types::entry::TypedValuesIterator; +use data_types::partition_metadata::{StatValues, Statistics}; +use internal_types::entry::Column as EntryColumn; +use crate::bitset::{iter_set_positions, BitSet}; +use arrow_deps::arrow::array::{ + ArrayDataBuilder, ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray, + TimestampNanosecondArray, UInt64Array, +}; +use arrow_deps::arrow::datatypes::DataType; +use internal_types::schema::{InfluxColumnType, InfluxFieldType, TIME_DATA_TYPE}; +use std::iter::FromIterator; use std::mem; +use std::sync::Arc; #[derive(Debug, Snafu)] +#[allow(missing_copy_implementations)] pub enum Error { - #[snafu(display("Don't know how to insert a column of type {}", inserted_value_type))] - UnknownColumnType { inserted_value_type: String }, - - #[snafu(display( - "Unable to insert {} type into a column of {}", - inserted_value_type, - existing_column_type - ))] + #[snafu(display("Unable to insert {} type into a column of {}", inserted, existing,))] TypeMismatch { - existing_column_type: String, - inserted_value_type: String, + existing: InfluxColumnType, + inserted: InfluxColumnType, }, - #[snafu(display("InternalError: Applying i64 range on a column with non-i64 type"))] - InternalTypeMismatchForTimePredicate, + #[snafu(display( + "Invalid null mask, expected to be {} bytes but was {}", + expected_bytes, + actual_bytes + ))] + InvalidNullMask { + expected_bytes: usize, + actual_bytes: usize, + }, } pub type Result = std::result::Result; /// Stores the actual data for columns in a chunk along with summary /// statistics -#[derive(Debug, Clone)] -pub enum Column { - F64(Vec>, StatValues), - I64(Vec>, StatValues), - U64(Vec>, StatValues), - String(Vec>, StatValues), - Bool(Vec>, StatValues), +#[derive(Debug)] +pub struct Column { + influx_type: InfluxColumnType, + valid: BitSet, + data: ColumnData, +} + +#[derive(Debug)] +pub enum ColumnData { + F64(Vec, StatValues), + I64(Vec, StatValues), + U64(Vec, StatValues), + String(Vec, StatValues), + Bool(BitSet, StatValues), Tag(Vec, StatValues), } impl Column { - /// Initializes a new column from typed values, the column on a table write - /// batch on an Entry. Will initialize the stats with the first - /// non-null value and update with any other non-null values included. - pub fn new_from_typed_values( - dictionary: &mut Dictionary, - row_count: usize, - logical_type: LogicalColumnType, - values: TypedValuesIterator<'_>, - ) -> Self { - match values { - TypedValuesIterator::String(vals) => match logical_type { - LogicalColumnType::Tag => { - let mut tag_values = vec![DID::invalid(); row_count]; - let mut stats: Option> = None; + pub fn new(row_count: usize, column_type: InfluxColumnType) -> Self { + let mut valid = BitSet::new(); + valid.append_unset(row_count); - let mut added_tag_values: Vec<_> = vals - .map(|tag| match tag { - Some(tag) => { - match stats.as_mut() { - Some(s) => StatValues::update_string(s, tag), - None => { - stats = Some(StatValues::new(tag.to_string())); - } - } - - dictionary.lookup_value_or_insert(tag) - } - None => DID::invalid(), - }) - .collect(); - - tag_values.append(&mut added_tag_values); - - Self::Tag( - tag_values, - stats.expect("can't insert tag column with no values"), - ) - } - LogicalColumnType::Field => { - let mut values = vec![None; row_count]; - let mut stats: Option> = None; - - for value in vals { - match value { - Some(v) => { - match stats.as_mut() { - Some(s) => StatValues::update_string(s, v), - None => stats = Some(StatValues::new(v.to_string())), - } - - values.push(Some(v.to_string())); - } - None => values.push(None), - } - } - - Self::String( - values, - stats.expect("can't insert string column with no values"), - ) - } - _ => panic!("unsupported!"), - }, - TypedValuesIterator::I64(vals) => { - let mut values = vec![None; row_count]; - let mut stats: Option> = None; - - for v in vals { - if let Some(val) = v { - match stats.as_mut() { - Some(s) => s.update(val), - None => stats = Some(StatValues::new(val)), - } - } - values.push(v); - } - - Self::I64( - values, - stats.expect("can't insert i64 column with no values"), - ) + let data = match column_type { + InfluxColumnType::Field(InfluxFieldType::Boolean) => { + let mut data = BitSet::new(); + data.append_unset(row_count); + ColumnData::Bool(data, StatValues::new()) } - TypedValuesIterator::F64(vals) => { - let mut values = vec![None; row_count]; - let mut stats: Option> = None; - - for v in vals { - if let Some(val) = v { - match stats.as_mut() { - Some(s) => s.update(val), - None => stats = Some(StatValues::new(val)), - } - } - values.push(v); - } - - Self::F64( - values, - stats.expect("can't insert f64 column with no values"), - ) + InfluxColumnType::Field(InfluxFieldType::UInteger) => { + ColumnData::U64(vec![0; row_count], StatValues::new()) } - TypedValuesIterator::U64(vals) => { - let mut values = vec![None; row_count]; - let mut stats: Option> = None; - - for v in vals { - if let Some(val) = v { - match stats.as_mut() { - Some(s) => s.update(val), - None => stats = Some(StatValues::new(val)), - } - } - values.push(v); - } - - Self::U64( - values, - stats.expect("can't insert u64 column with no values"), - ) + InfluxColumnType::Field(InfluxFieldType::Float) => { + ColumnData::F64(vec![0.0; row_count], StatValues::new()) } - TypedValuesIterator::Bool(vals) => { - let mut values = vec![None; row_count]; - let mut stats: Option> = None; - - for v in vals { - if let Some(val) = v { - match stats.as_mut() { - Some(s) => s.update(val), - None => stats = Some(StatValues::new(val)), - } - } - values.push(v); - } - - Self::Bool( - values, - stats.expect("can't insert bool column with no values"), - ) + InfluxColumnType::Field(InfluxFieldType::Integer) | InfluxColumnType::Timestamp => { + ColumnData::I64(vec![0; row_count], StatValues::new()) } + InfluxColumnType::Field(InfluxFieldType::String) => { + ColumnData::String(vec![String::new(); row_count], StatValues::new()) + } + InfluxColumnType::Tag => { + ColumnData::Tag(vec![DID::invalid(); row_count], StatValues::new()) + } + }; + + Self { + influx_type: column_type, + valid, + data, } } - /// Pushes typed values, the column from a table write batch on an Entry. - /// Updates statsistics for any non-null values. - pub fn push_typed_values( - &mut self, - dictionary: &mut Dictionary, - logical_type: LogicalColumnType, - values: TypedValuesIterator<'_>, - ) -> Result<()> { - match (self, values) { - (Self::Bool(col, stats), TypedValuesIterator::Bool(values)) => { - for val in values { - if let Some(v) = val { - stats.update(v) - }; - col.push(val); - } - } - (Self::I64(col, stats), TypedValuesIterator::I64(values)) => { - for val in values { - if let Some(v) = val { - stats.update(v) - }; - col.push(val); - } - } - (Self::F64(col, stats), TypedValuesIterator::F64(values)) => { - for val in values { - if let Some(v) = val { - stats.update(v) - }; - col.push(val); - } - } - (Self::U64(col, stats), TypedValuesIterator::U64(values)) => { - for val in values { - if let Some(v) = val { - stats.update(v) - }; - col.push(val); - } - } - (Self::String(col, stats), TypedValuesIterator::String(values)) => { - if logical_type != LogicalColumnType::Field { - TypeMismatch { - existing_column_type: "String", - inserted_value_type: "tag", - } - .fail()?; - } + pub fn validate_schema(&self, entry: &EntryColumn<'_>) -> Result<()> { + let entry_type = entry.influx_type(); - for val in values { - match val { - Some(v) => { - StatValues::update_string(stats, v); - col.push(Some(v.to_string())); - } - None => col.push(None), - } - } + ensure!( + entry_type == self.influx_type, + TypeMismatch { + existing: self.influx_type, + inserted: entry_type } - (Self::Tag(col, stats), TypedValuesIterator::String(values)) => { - if logical_type != LogicalColumnType::Tag { - TypeMismatch { - existing_column_type: "tag", - inserted_value_type: "String", - } - .fail()?; - } - - for val in values { - match val { - Some(v) => { - StatValues::update_string(stats, v); - let id = dictionary.lookup_value_or_insert(v); - col.push(id); - } - None => col.push(DID::invalid()), - } - } - } - (existing, values) => TypeMismatch { - existing_column_type: existing.type_description(), - inserted_value_type: values.type_description(), - } - .fail()?, - } + ); Ok(()) } - /// Pushes None values onto the column until its len is equal to that passed - /// in + pub fn influx_type(&self) -> InfluxColumnType { + self.influx_type + } + + pub fn append(&mut self, entry: &EntryColumn<'_>, dictionary: &mut Dictionary) -> Result<()> { + self.validate_schema(entry)?; + + let row_count = entry.row_count; + if row_count == 0 { + return Ok(()); + } + + let mask = construct_valid_mask(entry)?; + + match &mut self.data { + ColumnData::Bool(col_data, stats) => { + let entry_data = entry + .inner() + .values_as_bool_values() + .expect("invalid flatbuffer") + .values() + .expect("invalid payload"); + + let data_offset = col_data.len(); + col_data.append_unset(row_count); + + let initial_non_null_count = stats.count; + + for (idx, value) in iter_set_positions(&mask).zip(entry_data) { + stats.update(value); + + if *value { + col_data.set(data_offset + idx); + } + } + assert_eq!( + stats.count - initial_non_null_count, + entry_data.len() as u64 + ); + } + ColumnData::U64(col_data, stats) => { + let entry_data = entry + .inner() + .values_as_u64values() + .expect("invalid flatbuffer") + .values() + .expect("invalid payload") + .into_iter(); + + handle_write(row_count, &mask, entry_data, col_data, stats); + } + ColumnData::F64(col_data, stats) => { + let entry_data = entry + .inner() + .values_as_f64values() + .expect("invalid flatbuffer") + .values() + .expect("invalid payload") + .into_iter(); + + handle_write(row_count, &mask, entry_data, col_data, stats); + } + ColumnData::I64(col_data, stats) => { + let entry_data = entry + .inner() + .values_as_i64values() + .expect("invalid flatbuffer") + .values() + .expect("invalid payload") + .into_iter(); + + handle_write(row_count, &mask, entry_data, col_data, stats); + } + ColumnData::String(col_data, stats) => { + let entry_data = entry + .inner() + .values_as_string_values() + .expect("invalid flatbuffer") + .values() + .expect("invalid payload") + .into_iter() + .map(ToString::to_string); + + handle_write(row_count, &mask, entry_data, col_data, stats); + } + ColumnData::Tag(col_data, stats) => { + let entry_data = entry + .inner() + .values_as_string_values() + .expect("invalid flatbuffer") + .values() + .expect("invalid payload"); + + let data_offset = col_data.len(); + col_data.resize(data_offset + row_count, DID::invalid()); + + let initial_non_null_count = stats.count; + let to_add = entry_data.len(); + + for (idx, value) in iter_set_positions(&mask).zip(entry_data) { + stats.update(value); + col_data[data_offset + idx] = dictionary.lookup_value_or_insert(value); + } + + assert_eq!(stats.count - initial_non_null_count, to_add as u64); + } + }; + + self.valid.append_bits(entry.row_count, &mask); + Ok(()) + } + pub fn push_nulls_to_len(&mut self, len: usize) { - match self { - Self::Tag(vals, _) => { - if len > vals.len() { - vals.resize(len, DID::invalid()); - } - } - Self::I64(vals, _) => { - if len > vals.len() { - vals.resize(len, None); - } - } - Self::F64(vals, _) => { - if len > vals.len() { - vals.resize(len, None); - } - } - Self::U64(vals, _) => { - if len > vals.len() { - vals.resize(len, None); - } - } - Self::Bool(vals, _) => { - if len > vals.len() { - vals.resize(len, None); - } - } - Self::String(vals, _) => { - if len > vals.len() { - vals.resize(len, None); - } - } + if self.valid.len() == len { + return; + } + assert!(len > self.valid.len(), "cannot shrink column"); + let delta = len - self.valid.len(); + self.valid.append_unset(delta); + + match &mut self.data { + ColumnData::F64(data, _) => data.resize(len, 0.), + ColumnData::I64(data, _) => data.resize(len, 0), + ColumnData::U64(data, _) => data.resize(len, 0), + ColumnData::String(data, _) => data.resize(len, String::new()), + ColumnData::Bool(data, _) => data.append_unset(delta), + ColumnData::Tag(data, _) => data.resize(len, DID::invalid()), } } pub fn len(&self) -> usize { - match self { - Self::F64(v, _) => v.len(), - Self::I64(v, _) => v.len(), - Self::U64(v, _) => v.len(), - Self::String(v, _) => v.len(), - Self::Bool(v, _) => v.len(), - Self::Tag(v, _) => v.len(), - } + self.valid.len() } - pub fn type_description(&self) -> &'static str { - match self { - Self::F64(_, _) => "f64", - Self::I64(_, _) => "i64", - Self::U64(_, _) => "u64", - Self::String(_, _) => "String", - Self::Bool(_, _) => "bool", - Self::Tag(_, _) => "tag", - } - } - - pub fn get_i64_stats(&self) -> Option> { - match self { - Self::I64(_, values) => Some(values.clone()), - _ => None, + pub fn stats(&self) -> Statistics { + match &self.data { + ColumnData::F64(_, stats) => Statistics::F64(stats.clone()), + ColumnData::I64(_, stats) => Statistics::I64(stats.clone()), + ColumnData::U64(_, stats) => Statistics::U64(stats.clone()), + ColumnData::Bool(_, stats) => Statistics::Bool(stats.clone()), + ColumnData::String(_, stats) | ColumnData::Tag(_, stats) => { + Statistics::String(stats.clone()) + } } } @@ -343,27 +255,150 @@ impl Column { /// the dictionary size in the chunk that holds the table that has this /// column. The size returned here is only for their identifiers. pub fn size(&self) -> usize { - match self { - Self::F64(v, stats) => { - mem::size_of::>() * v.len() + mem::size_of_val(&stats) - } - Self::I64(v, stats) => { - mem::size_of::>() * v.len() + mem::size_of_val(&stats) - } - Self::U64(v, stats) => { - mem::size_of::>() * v.len() + mem::size_of_val(&stats) - } - Self::Bool(v, stats) => { - mem::size_of::>() * v.len() + mem::size_of_val(&stats) - } - Self::Tag(v, stats) => mem::size_of::() * v.len() + mem::size_of_val(&stats), - Self::String(v, stats) => { - let string_bytes_size = v - .iter() - .fold(0, |acc, val| acc + val.as_ref().map_or(0, |s| s.len())); - let vec_pointer_sizes = mem::size_of::>() * v.len(); + let data_size = match &self.data { + ColumnData::F64(v, stats) => mem::size_of::() * v.len() + mem::size_of_val(&stats), + ColumnData::I64(v, stats) => mem::size_of::() * v.len() + mem::size_of_val(&stats), + ColumnData::U64(v, stats) => mem::size_of::() * v.len() + mem::size_of_val(&stats), + ColumnData::Bool(v, stats) => v.byte_len() + mem::size_of_val(&stats), + ColumnData::Tag(v, stats) => mem::size_of::() * v.len() + mem::size_of_val(&stats), + ColumnData::String(v, stats) => { + let string_bytes_size = v.iter().fold(0, |acc, val| acc + val.len()); + let vec_pointer_sizes = mem::size_of::() * v.len(); string_bytes_size + vec_pointer_sizes + mem::size_of_val(&stats) } + }; + data_size + self.valid.byte_len() + } + + pub fn to_arrow(&self, dictionary: &Dictionary) -> Result { + let nulls = self.valid.to_arrow(); + let data: ArrayRef = match &self.data { + ColumnData::F64(data, _) => { + let data = ArrayDataBuilder::new(DataType::Float64) + .len(data.len()) + .add_buffer(data.iter().cloned().collect()) + .null_bit_buffer(nulls) + .build(); + Arc::new(Float64Array::from(data)) + } + ColumnData::I64(data, _) => match self.influx_type { + InfluxColumnType::Timestamp => { + let data = ArrayDataBuilder::new(TIME_DATA_TYPE()) + .len(data.len()) + .add_buffer(data.iter().cloned().collect()) + .null_bit_buffer(nulls) + .build(); + Arc::new(TimestampNanosecondArray::from(data)) + } + InfluxColumnType::Field(InfluxFieldType::Integer) => { + let data = ArrayDataBuilder::new(DataType::Int64) + .len(data.len()) + .add_buffer(data.iter().cloned().collect()) + .null_bit_buffer(nulls) + .build(); + + Arc::new(Int64Array::from(data)) + } + _ => unreachable!(), + }, + ColumnData::U64(data, _) => { + let data = ArrayDataBuilder::new(DataType::UInt64) + .len(data.len()) + .add_buffer(data.iter().cloned().collect()) + .null_bit_buffer(nulls) + .build(); + Arc::new(UInt64Array::from(data)) + } + ColumnData::String(data, _) => { + // TODO: Store this closer to the arrow representation + let iter = data + .iter() + .enumerate() + .map(|(idx, value)| self.valid.get(idx).then(|| value) as _); + + let array = StringArray::from_iter(iter); + Arc::new(array) + } + ColumnData::Bool(data, _) => { + let data = ArrayDataBuilder::new(DataType::Boolean) + .len(data.len()) + .add_buffer(data.to_arrow()) + .null_bit_buffer(nulls) + .build(); + Arc::new(BooleanArray::from(data)) + } + ColumnData::Tag(data, _) => { + // TODO: Store this closer to the arrow representation + let iter = data.iter().enumerate().map(|(idx, id)| { + self.valid.get(idx).then(|| { + dictionary + .lookup_id(*id) + .expect("dictionary had mapping for tag value") + }) + }); + + let array = StringArray::from_iter(iter); + Arc::new(array) + } + }; + + assert_eq!(data.len(), self.len()); + + Ok(data) + } +} + +/// Construct a validity mask from the given column's null mask +fn construct_valid_mask(column: &EntryColumn<'_>) -> Result> { + let buf_len = (column.row_count + 7) >> 3; + match column.inner().null_mask() { + Some(data) => { + ensure!( + data.len() == buf_len, + InvalidNullMask { + expected_bytes: buf_len, + actual_bytes: data.len() + } + ); + + Ok(data + .iter() + .map(|x| { + // Currently the bit mask is backwards + !x.reverse_bits() + }) + .collect()) + } + None => { + // If no null mask they're all valid + let mut data = Vec::new(); + data.resize(buf_len, 0xFF); + Ok(data) } } } + +/// Writes entry data into a column based on the valid mask +fn handle_write( + row_count: usize, + valid_mask: &[u8], + entry_data: E, + col_data: &mut Vec, + stats: &mut StatValues, +) where + T: Clone + Default + PartialOrd, + E: Iterator + ExactSizeIterator, +{ + let data_offset = col_data.len(); + col_data.resize(data_offset + row_count, Default::default()); + + let initial_non_null_count = stats.count; + let to_add = entry_data.len(); + + for (idx, value) in iter_set_positions(valid_mask).zip(entry_data) { + stats.update(&value); + col_data[data_offset + idx] = value; + } + + assert_eq!(stats.count - initial_non_null_count, to_add as u64); +} diff --git a/mutable_buffer/src/lib.rs b/mutable_buffer/src/lib.rs index 0fd90cc61d..211ec697bd 100644 --- a/mutable_buffer/src/lib.rs +++ b/mutable_buffer/src/lib.rs @@ -57,6 +57,7 @@ clippy::clone_on_ref_ptr )] +mod bitset; pub mod chunk; mod column; mod dictionary; diff --git a/mutable_buffer/src/table.rs b/mutable_buffer/src/table.rs index 65167530fa..bd8915b899 100644 --- a/mutable_buffer/src/table.rs +++ b/mutable_buffer/src/table.rs @@ -1,33 +1,20 @@ -use std::{cmp, collections::BTreeMap, iter::FromIterator, sync::Arc}; +use std::collections::BTreeMap; use crate::{ column, column::Column, dictionary::{Dictionary, Error as DictionaryError, DID}, }; -use data_types::{ - database_rules::WriterId, - partition_metadata::{ColumnSummary, Statistics}, -}; +use data_types::{database_rules::WriterId, partition_metadata::ColumnSummary}; use internal_types::{ entry::{self, ClockValue}, - schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME}, + schema::{builder::SchemaBuilder, Schema}, selection::Selection, }; -use snafu::{OptionExt, ResultExt, Snafu}; +use snafu::{ensure, OptionExt, ResultExt, Snafu}; -use arrow_deps::{ - arrow, - arrow::{ - array::{ - ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray, - TimestampNanosecondArray, UInt64Array, - }, - datatypes::DataType as ArrowDataType, - record_batch::RecordBatch, - }, -}; +use arrow_deps::{arrow, arrow::record_batch::RecordBatch}; #[derive(Debug, Snafu)] pub enum Error { @@ -37,30 +24,13 @@ pub enum Error { source: column::Error, }, - #[snafu(display( - "Expected column {} to be type {} but was {}", - column, - expected_column_type, - actual_column_type - ))] - ColumnTypeMismatch { + #[snafu(display("Column {} had {} rows, expected {}", column, expected, actual))] + IncorrectRowCount { column: String, - expected_column_type: String, - actual_column_type: String, + expected: usize, + actual: usize, }, - #[snafu(display( - "Expected column {} to be a tag but received it as a string field", - column - ))] - ExpectedTag { column: String }, - - #[snafu(display( - "Expected column {} to be a string field but received it as a tag", - column - ))] - ExpectedField { column: String }, - #[snafu(display("Internal error: unexpected aggregate request for None aggregate",))] InternalUnexpectedNoneAggregate {}, @@ -115,7 +85,7 @@ pub enum Error { } pub type Result = std::result::Result; -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Table { /// Name of the table as a DID in the chunk dictionary pub id: DID, @@ -164,83 +134,51 @@ impl Table { _writer_id: WriterId, columns: Vec>, ) -> Result<()> { - // get the column ids and validate schema for those that already exist - let columns_with_inserts = columns - .into_iter() - .map(|insert_column| { - let column_id = dictionary.lookup_value_or_insert(insert_column.name()); - let values = insert_column.values(); - - if let Some(c) = self.columns.get(&column_id) { - match (&values, c) { - (entry::TypedValuesIterator::Bool(_), Column::Bool(_, _)) => (), - (entry::TypedValuesIterator::U64(_), Column::U64(_, _)) => (), - (entry::TypedValuesIterator::F64(_), Column::F64(_, _)) => (), - (entry::TypedValuesIterator::I64(_), Column::I64(_, _)) => (), - (entry::TypedValuesIterator::String(_), Column::String(_, _)) => { - if !insert_column.is_field() { - ExpectedField { - column: insert_column.name(), - } - .fail()? - }; - } - (entry::TypedValuesIterator::String(_), Column::Tag(_, _)) => { - if !insert_column.is_tag() { - ExpectedTag { - column: insert_column.name(), - } - .fail()? - }; - } - _ => ColumnTypeMismatch { - column: insert_column.name(), - expected_column_type: c.type_description(), - actual_column_type: values.type_description(), - } - .fail()?, - } - } - - Ok((column_id, insert_column.logical_type(), values)) - }) - .collect::>>()?; - let row_count_before_insert = self.row_count(); + let additional_rows = columns.first().map(|x| x.row_count).unwrap_or_default(); + let final_row_count = row_count_before_insert + additional_rows; - for (column_id, logical_type, values) in columns_with_inserts.into_iter() { - match self.columns.get_mut(&column_id) { - Some(c) => c - .push_typed_values(dictionary, logical_type, values) - .with_context(|| { - let column = dictionary - .lookup_id(column_id) - .expect("column name must be present in dictionary"); - ColumnError { column } - })?, - None => { - self.columns.insert( - column_id, - Column::new_from_typed_values( - dictionary, - row_count_before_insert, - logical_type, - values, - ), - ); + // get the column ids and validate schema for those that already exist + let column_ids = columns + .iter() + .map(|column| { + ensure!( + column.row_count == additional_rows, + IncorrectRowCount { + column: column.name(), + expected: additional_rows, + actual: column.row_count, + } + ); + + let id = dictionary.lookup_value_or_insert(column.name()); + if let Some(c) = self.columns.get(&id) { + c.validate_schema(&column).context(ColumnError { + column: column.name(), + })?; } - } + + Ok(id) + }) + .collect::, _>>()?; + + for (fb_column, column_id) in columns.into_iter().zip(column_ids.into_iter()) { + let influx_type = fb_column.influx_type(); + + let column = self + .columns + .entry(column_id) + .or_insert_with(|| Column::new(row_count_before_insert, influx_type)); + + column.append(&fb_column, dictionary).context(ColumnError { + column: fb_column.name(), + })?; + + assert_eq!(column.len(), final_row_count); } - // ensure all columns have the same number of rows as the one with the most. - // This adds nulls to the columns that weren't included in this write - let max_row_count = self - .columns - .values() - .fold(row_count_before_insert, |max, col| cmp::max(max, col.len())); - for c in self.columns.values_mut() { - c.push_nulls_to_len(max_row_count); + c.push_nulls_to_len(final_row_count); } Ok(()) @@ -324,27 +262,10 @@ impl Table { /// Returns the Schema of this table fn schema_impl(&self, selection: &TableColSelection<'_>) -> Result { let mut schema_builder = SchemaBuilder::new(); - for col in &selection.cols { - let column_name = col.column_name; let column = self.column(col.column_id)?; - - schema_builder = match column { - Column::String(_, _) => schema_builder.field(column_name, ArrowDataType::Utf8), - Column::Tag(_, _) => schema_builder.tag(column_name), - Column::F64(_, _) => schema_builder.field(column_name, ArrowDataType::Float64), - Column::I64(_, _) => { - if column_name == TIME_COLUMN_NAME { - schema_builder.timestamp() - } else { - schema_builder.field(column_name, ArrowDataType::Int64) - } - } - Column::U64(_, _) => schema_builder.field(column_name, ArrowDataType::UInt64), - Column::Bool(_, _) => schema_builder.field(column_name, ArrowDataType::Boolean), - }; + schema_builder = schema_builder.influx_column(col.column_name, column.influx_type()); } - schema_builder.build().context(InternalSchema) } @@ -356,60 +277,18 @@ impl Table { dictionary: &Dictionary, selection: &TableColSelection<'_>, ) -> Result { - let mut columns = Vec::with_capacity(selection.cols.len()); - - for col in &selection.cols { - let column = self.column(col.column_id)?; - - let array: ArrayRef = match column { - Column::String(vals, _) => { - let iter = vals.iter().map(|s| s.as_deref()); - let array = StringArray::from_iter(iter); - Arc::new(array) - } - Column::Tag(vals, _) => { - let iter = vals.iter().map(|id| { - if *id == DID::invalid() { - return None; - } - Some( - dictionary - .lookup_id(*id) - .expect("dictionary had mapping for tag value"), - ) - }); - - let array = StringArray::from_iter(iter); - Arc::new(array) - } - Column::F64(vals, _) => { - let array = Float64Array::from_iter(vals.iter()); - Arc::new(array) - } - Column::I64(vals, _) => { - if col.column_name == TIME_COLUMN_NAME { - let array = TimestampNanosecondArray::from_iter(vals.iter()); - Arc::new(array) - } else { - let array = Int64Array::from_iter(vals.iter()); - Arc::new(array) - } - } - Column::U64(vals, _) => { - let array = UInt64Array::from_iter(vals.iter()); - Arc::new(array) - } - Column::Bool(vals, _) => { - let array = BooleanArray::from_iter(vals.iter()); - Arc::new(array) - } - }; - - columns.push(array); - } + let columns = selection + .cols + .iter() + .map(|col| { + let column = self.column(col.column_id)?; + column.to_arrow(dictionary).context(ColumnError { + column: col.column_name, + }) + }) + .collect::>>()?; let schema = self.schema_impl(selection)?.into(); - RecordBatch::try_new(schema, columns).context(ArrowError {}) } @@ -421,19 +300,9 @@ impl Table { .lookup_id(*column_id) .expect("column name in dictionary"); - let stats = match c { - Column::F64(_, stats) => Statistics::F64(stats.clone()), - Column::I64(_, stats) => Statistics::I64(stats.clone()), - Column::U64(_, stats) => Statistics::U64(stats.clone()), - Column::Bool(_, stats) => Statistics::Bool(stats.clone()), - Column::String(_, stats) | Column::Tag(_, stats) => { - Statistics::String(stats.clone()) - } - }; - ColumnSummary { name: column_name.to_string(), - stats, + stats: c.stats(), } }) .collect() @@ -461,7 +330,9 @@ impl<'a> TableColSelection<'a> { #[cfg(test)] mod tests { + use arrow::datatypes::DataType as ArrowDataType; use internal_types::entry::test_helpers::lp_to_entry; + use internal_types::schema::{InfluxColumnType, InfluxFieldType}; use super::*; @@ -476,15 +347,15 @@ mod tests { ]; write_lines_to_table(&mut table, &mut dictionary, lp_lines.clone()); - assert_eq!(112, table.size()); + assert_eq!(84, table.size()); // doesn't double because of the stats overhead write_lines_to_table(&mut table, &mut dictionary, lp_lines.clone()); - assert_eq!(192, table.size()); + assert_eq!(132, table.size()); // now make sure it increased by the same amount minus stats overhead write_lines_to_table(&mut table, &mut dictionary, lp_lines); - assert_eq!(272, table.size()); + assert_eq!(180, table.size()); } #[test] @@ -588,8 +459,12 @@ mod tests { assert!( matches!( &response, - Error::ExpectedTag { + Error::ColumnError { column, + source: column::Error::TypeMismatch { + existing: InfluxColumnType::Tag, + inserted: InfluxColumnType::Field(InfluxFieldType::String) + } } if column == "t1" ), "didn't match returned error: {:?}", @@ -618,13 +493,13 @@ mod tests { assert!( matches!( &response, - Error::ColumnTypeMismatch { - expected_column_type, - actual_column_type, - column - } if expected_column_type == "i64" - && actual_column_type == "u64" - && column == "iv" + Error::ColumnError { + column, + source: column::Error::TypeMismatch { + inserted: InfluxColumnType::Field(InfluxFieldType::UInteger), + existing: InfluxColumnType::Field(InfluxFieldType::Integer) + } + } if column == "iv" ), "didn't match returned error: {:?}", response @@ -652,13 +527,13 @@ mod tests { assert!( matches!( &response, - Error::ColumnTypeMismatch { - expected_column_type, - actual_column_type, - column - } if expected_column_type == "f64" - && actual_column_type == "i64" - && column == "fv" + Error::ColumnError { + column, + source: column::Error::TypeMismatch { + existing: InfluxColumnType::Field(InfluxFieldType::Float), + inserted: InfluxColumnType::Field(InfluxFieldType::Integer) + } + } if column == "fv" ), "didn't match returned error: {:?}", response @@ -686,13 +561,13 @@ mod tests { assert!( matches!( &response, - Error::ColumnTypeMismatch { - expected_column_type, - actual_column_type, - column - } if expected_column_type == "bool" - && actual_column_type == "f64" - && column == "bv" + Error::ColumnError { + column, + source: column::Error::TypeMismatch { + existing: InfluxColumnType::Field(InfluxFieldType::Boolean), + inserted: InfluxColumnType::Field(InfluxFieldType::Float) + } + } if column == "bv" ), "didn't match returned error: {:?}", response @@ -720,13 +595,13 @@ mod tests { assert!( matches!( &response, - Error::ColumnTypeMismatch { - expected_column_type, - actual_column_type, - column - } if expected_column_type == "String" - && actual_column_type == "bool" - && column == "sv" + Error::ColumnError { + column, + source: column::Error::TypeMismatch { + existing: InfluxColumnType::Field(InfluxFieldType::String), + inserted: InfluxColumnType::Field(InfluxFieldType::Boolean), + } + } if column == "sv" ), "didn't match returned error: {:?}", response @@ -754,8 +629,12 @@ mod tests { assert!( matches!( &response, - Error::ExpectedField { - column + Error::ColumnError { + column, + source: column::Error::TypeMismatch { + existing: InfluxColumnType::Field(InfluxFieldType::String), + inserted: InfluxColumnType::Tag, + } } if column == "sv" ), "didn't match returned error: {:?}", diff --git a/server/src/db.rs b/server/src/db.rs index 8d58a24617..775380594c 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1599,7 +1599,7 @@ mod tests { to_arc("cpu"), 0, ChunkStorage::OpenMutableBuffer, - 127, + 106, )]; let size: usize = db @@ -1711,21 +1711,21 @@ mod tests { to_arc("cpu"), 1, ChunkStorage::OpenMutableBuffer, - 121, + 100, ), ChunkSummary::new_without_timestamps( to_arc("1970-01-05T15"), to_arc("cpu"), 0, ChunkStorage::ClosedMutableBuffer, - 157, + 129, ), ChunkSummary::new_without_timestamps( to_arc("1970-01-05T15"), to_arc("cpu"), 1, ChunkStorage::OpenMutableBuffer, - 159, + 131, ), ]; @@ -1735,7 +1735,7 @@ mod tests { expected, chunk_summaries ); - assert_eq!(db.memory_registries.mutable_buffer.bytes(), 121 + 157 + 159); + assert_eq!(db.memory_registries.mutable_buffer.bytes(), 100 + 129 + 131); assert_eq!(db.memory_registries.read_buffer.bytes(), 1213); } diff --git a/server/src/db/system_tables.rs b/server/src/db/system_tables.rs index 133f45c4d8..f411cc7cb2 100644 --- a/server/src/db/system_tables.rs +++ b/server/src/db/system_tables.rs @@ -306,11 +306,11 @@ mod tests { columns: vec![ ColumnSummary { name: "c1".to_string(), - stats: Statistics::I64(StatValues::new(23)), + stats: Statistics::I64(StatValues::new_with_value(23)), }, ColumnSummary { name: "c2".to_string(), - stats: Statistics::I64(StatValues::new(43)), + stats: Statistics::I64(StatValues::new_with_value(43)), }, ], }], diff --git a/server/src/query_tests/sql.rs b/server/src/query_tests/sql.rs index 05ea2cbff9..0b1428157c 100644 --- a/server/src/query_tests/sql.rs +++ b/server/src/query_tests/sql.rs @@ -268,8 +268,8 @@ async fn sql_select_from_system_chunks() { "+----+---------------+------------+-------------------+-----------------+", "| id | partition_key | table_name | storage | estimated_bytes |", "+----+---------------+------------+-------------------+-----------------+", - "| 0 | 1970-01-01T00 | h2o | OpenMutableBuffer | 324 |", - "| 0 | 1970-01-01T00 | o2 | OpenMutableBuffer | 264 |", + "| 0 | 1970-01-01T00 | h2o | OpenMutableBuffer | 257 |", + "| 0 | 1970-01-01T00 | o2 | OpenMutableBuffer | 221 |", "+----+---------------+------------+-------------------+-----------------+", ]; run_sql_test_case!( diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index e8dd518ae5..1186f92b70 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -278,7 +278,7 @@ async fn test_chunk_get() { table_name: "cpu".into(), id: 0, storage: ChunkStorage::OpenMutableBuffer as i32, - estimated_bytes: 161, + estimated_bytes: 132, time_of_first_write: None, time_of_last_write: None, time_closing: None, @@ -288,7 +288,7 @@ async fn test_chunk_get() { table_name: "disk".into(), id: 0, storage: ChunkStorage::OpenMutableBuffer as i32, - estimated_bytes: 127, + estimated_bytes: 114, time_of_first_write: None, time_of_last_write: None, time_closing: None, @@ -455,7 +455,7 @@ async fn test_list_partition_chunks() { table_name: "cpu".into(), id: 0, storage: ChunkStorage::OpenMutableBuffer as i32, - estimated_bytes: 161, + estimated_bytes: 132, time_of_first_write: None, time_of_last_write: None, time_closing: None, diff --git a/tests/end_to_end_cases/management_cli.rs b/tests/end_to_end_cases/management_cli.rs index b9c8c52149..b0623aa1a3 100644 --- a/tests/end_to_end_cases/management_cli.rs +++ b/tests/end_to_end_cases/management_cli.rs @@ -191,7 +191,7 @@ async fn test_get_chunks() { .and(predicate::str::contains( r#""storage": "OpenMutableBuffer","#, )) - .and(predicate::str::contains(r#""estimated_bytes": 161"#)) + .and(predicate::str::contains(r#""estimated_bytes": 132"#)) // Check for a non empty timestamp such as // "time_of_first_write": "2021-03-30T17:11:10.723866Z", .and(predicate::str::contains(r#""time_of_first_write": "20"#));