From c3019a91bd78f8e4d3d0e9d5c9909a54e8188b78 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Fri, 8 Jan 2021 12:37:11 +0000 Subject: [PATCH] feat: add support for determining logical column types --- read_buffer/src/column.rs | 23 ++++++++++++++++++ read_buffer/src/row_group.rs | 25 ++++++++++++++++---- read_buffer/src/table.rs | 46 +++++++++++++++++++++++++++++------- 3 files changed, 81 insertions(+), 13 deletions(-) diff --git a/read_buffer/src/column.rs b/read_buffer/src/column.rs index 2b20b6a0cf..0ca4754928 100644 --- a/read_buffer/src/column.rs +++ b/read_buffer/src/column.rs @@ -69,6 +69,18 @@ impl Column { } } + /// Returns the logical data-type associated with the column. + pub fn logical_datatype(&self) -> LogicalDataType { + match self { + Column::String(_, _) => LogicalDataType::String, + Column::Float(_, _) => LogicalDataType::Float, + Column::Integer(_, _) => LogicalDataType::Integer, + Column::Unsigned(_, _) => LogicalDataType::Unsigned, + Column::Bool => LogicalDataType::Boolean, + Column::ByteArray(_, _) => LogicalDataType::Binary, + } + } + pub fn size(&self) -> u64 { 0 } @@ -600,6 +612,17 @@ pub struct ColumnProperties { pub has_pre_computed_row_ids: bool, } +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +/// The logical data-type for a column. +pub enum LogicalDataType { + Integer, // Signed integer + Unsigned, // Unsigned integer + Float, // + String, // UTF-8 valid string + Binary, // Arbitrary collection of bytes + Boolean, // +} + #[derive(Default, Debug, PartialEq)] // The meta-data for a column pub struct MetaData diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index 5700f51bc4..f055382e6f 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -7,9 +7,9 @@ use arrow_deps::arrow; use arrow_deps::arrow::record_batch::RecordBatch; use crate::column::{ - cmp::Operator, AggregateResult, AggregateType, Column, EncodedValues, OwnedValue, RowIDs, - RowIDsOption, Scalar, Value, Values, ValuesIterator, FIELD_COLUMN_TYPE, TAG_COLUMN_TYPE, - TIME_COLUMN_TYPE, + self, cmp::Operator, AggregateResult, AggregateType, Column, EncodedValues, LogicalDataType, + OwnedValue, RowIDs, RowIDsOption, Scalar, Value, Values, ValuesIterator, FIELD_COLUMN_TYPE, + TAG_COLUMN_TYPE, TIME_COLUMN_TYPE, }; /// The name used for a timestamp column. @@ -48,6 +48,7 @@ impl RowGroup { ColumnType::Tag(c) => { assert_eq!(c.num_rows(), rows); + meta.column_types.insert(name.clone(), c.logical_datatype()); meta.column_ranges .insert(name.clone(), c.column_range().unwrap()); all_columns_by_name.insert(name.clone(), all_columns.len()); @@ -57,6 +58,7 @@ impl RowGroup { ColumnType::Field(c) => { assert_eq!(c.num_rows(), rows); + meta.column_types.insert(name.clone(), c.logical_datatype()); meta.column_ranges .insert(name.clone(), c.column_range().unwrap()); all_columns_by_name.insert(name.clone(), all_columns.len()); @@ -75,6 +77,7 @@ impl RowGroup { Some((_, _)) => unreachable!("unexpected types for time range"), }; + meta.column_types.insert(name.clone(), c.logical_datatype()); meta.column_ranges .insert(name.clone(), c.column_range().unwrap()); @@ -85,6 +88,12 @@ impl RowGroup { } } + // Meta data should have same columns for types and ranges. + assert_eq!( + meta.column_types.keys().collect::>(), + meta.column_ranges.keys().collect::>(), + ); + Self { meta, columns: all_columns, @@ -111,6 +120,11 @@ impl RowGroup { &self.meta.column_ranges } + /// The logical data-type of each column. + pub fn column_logical_types(&self) -> &BTreeMap { + &self.meta.column_types + } + // Returns a reference to a column from the column name. // // It is the caller's responsibility to ensure the column exists in the read @@ -953,7 +967,7 @@ impl Ord for GroupKey<'_> { // A representation of a column name. pub type ColumnName<'a> = &'a str; -/// The logical type that a column could have. +/// The InfluxDB-specific semantic meaning of a column. pub enum ColumnType { Tag(Column), Field(Column), @@ -987,6 +1001,9 @@ struct MetaData { // possibly match based on the range of values a column has. column_ranges: BTreeMap, + // The logical column types for each column in the `RowGroup`. + column_types: BTreeMap, + // The total time range of this table spanning all of the `RowGroup`s within // the table. // diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index db7a9b759b..8cf466a8ef 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -2,7 +2,10 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Display; use std::slice::Iter; -use crate::row_group::{ColumnName, GroupKey, Predicate, RowGroup}; +use crate::{ + column, + row_group::{ColumnName, GroupKey, Predicate, RowGroup}, +}; use crate::{ column::{AggregateResult, AggregateType, OwnedValue, Scalar, Value}, row_group::{ReadFilterResult, ReadGroupResult}, @@ -94,6 +97,11 @@ impl Table { todo!() } + /// The logical data-type of each column in the `Table`'s schema. + pub fn column_logical_types(&self) -> &BTreeMap { + &self.meta.column_types + } + // Determines if schema contains all the provided column names. fn has_all_columns(&self, names: &[ColumnName<'_>]) -> bool { for &name in names { @@ -451,6 +459,9 @@ struct MetaData { // possibly match based on the range of values a column has. column_ranges: BTreeMap, + // The `ReadBuffer` logical types associated with the columns in the table + column_types: BTreeMap, + // The total time range of this table spanning all of the row groups within // the table. // @@ -460,16 +471,21 @@ struct MetaData { } impl MetaData { - pub fn new(segment: &RowGroup) -> Self { + pub fn new(rg: &RowGroup) -> Self { Self { - size: segment.size(), - rows: u64::from(segment.rows()), - column_ranges: segment + size: rg.size(), + rows: u64::from(rg.rows()), + column_ranges: rg .column_ranges() .iter() .map(|(k, v)| (k.to_string(), (v.0.clone(), v.1.clone()))) .collect(), - time_range: Some(segment.time_range()), + column_types: rg + .column_logical_types() + .iter() + .map(|(k, v)| (k.clone(), *v)) + .collect(), + time_range: Some(rg.time_range()), } } @@ -478,6 +494,10 @@ impl MetaData { self.size += rg.size(); self.rows += u64::from(rg.rows()); + // The incoming row group must have the same schema as the existing row + // groups in the table. + assert_eq!(&self.column_types, rg.column_logical_types()); + assert_eq!(self.column_ranges.len(), rg.column_ranges().len()); for (column_name, (column_range_min, column_range_max)) in rg.column_ranges() { let mut curr_range = self @@ -579,7 +599,7 @@ impl std::fmt::Display for ReadGroupResults<'_, '_> { #[cfg(test)] mod test { use super::*; - use crate::column::{cmp::Operator, Column}; + use crate::column::{cmp::Operator, Column, LogicalDataType}; use crate::row_group::{ColumnType, TIME_COLUMN_NAME}; fn build_predicates( @@ -617,9 +637,17 @@ mod test { let fc = ColumnType::Field(Column::from(&[100_u64, 101, 200, 203, 203, 10][..])); columns.insert("count".to_string(), fc); - let segment = RowGroup::new(6, columns); + let rg = RowGroup::new(6, columns); - let mut table = Table::new("cpu".to_owned(), segment); + let mut table = Table::new("cpu".to_owned(), rg); + let exp_col_types = vec![ + ("region".to_owned(), LogicalDataType::String), + ("count".to_owned(), LogicalDataType::Unsigned), + ("time".to_owned(), LogicalDataType::Integer), + ] + .into_iter() + .collect::>(); + assert_eq!(table.column_logical_types(), &exp_col_types); // Build another segment. let mut columns = BTreeMap::new();