From d805ce6189f8a6fc49911a478a24101bba9f7c51 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Mon, 18 Jan 2021 12:05:56 +0000 Subject: [PATCH 01/11] refactor: move LogicalDataType into Schema --- read_buffer/src/column.rs | 28 +++------------------------- read_buffer/src/row_group.rs | 8 ++++---- read_buffer/src/schema.rs | 28 +++++++++++++++++++++++++++- read_buffer/src/table.rs | 15 ++++++--------- 4 files changed, 40 insertions(+), 39 deletions(-) diff --git a/read_buffer/src/column.rs b/read_buffer/src/column.rs index 44a99eb7f1..8ad9e45f9b 100644 --- a/read_buffer/src/column.rs +++ b/read_buffer/src/column.rs @@ -9,9 +9,11 @@ use std::sync::Arc; use arrow::array; use croaring::Bitmap; +use either::Either; use arrow_deps::{arrow, arrow::array::Array}; -use either::Either; + +use crate::schema::LogicalDataType; // Edd's totally made up magic constant. This determines whether we would use // a run-length encoded dictionary encoding or just a plain dictionary encoding. @@ -609,30 +611,6 @@ pub struct ColumnProperties { pub has_pre_computed_row_ids: bool, } -#[derive(Debug, Copy, Clone, Eq, PartialEq)] -/// The logical data-type for a column. -pub enum LogicalDataType { - Integer, // Signed integer - Unsigned, // Unsigned integer - Float, // - String, // UTF-8 valid string - Binary, // Arbitrary collection of bytes - Boolean, // -} - -impl LogicalDataType { - pub fn to_arrow_datatype(&self) -> arrow::datatypes::DataType { - match &self { - LogicalDataType::Integer => arrow::datatypes::DataType::Int64, - LogicalDataType::Unsigned => arrow::datatypes::DataType::UInt64, - LogicalDataType::Float => arrow::datatypes::DataType::Float64, - LogicalDataType::String => arrow::datatypes::DataType::Utf8, - LogicalDataType::Binary => arrow::datatypes::DataType::Binary, - LogicalDataType::Boolean => arrow::datatypes::DataType::Boolean, - } - } -} - #[derive(Default, Debug, PartialEq)] // The meta-data for a column pub struct MetaData diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index 6db3287312..6d0e95acde 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -9,10 +9,10 @@ use hashbrown::{hash_map, HashMap}; use itertools::Itertools; use crate::column::{ - self, cmp::Operator, AggregateResult, AggregateType, Column, EncodedValues, LogicalDataType, - OwnedValue, RowIDs, RowIDsOption, Scalar, Value, Values, ValuesIterator, + cmp::Operator, AggregateResult, AggregateType, Column, EncodedValues, OwnedValue, RowIDs, + RowIDsOption, Scalar, Value, Values, ValuesIterator, }; -use crate::schema::ResultSchema; +use crate::schema::{LogicalDataType, ResultSchema}; use arrow_deps::arrow::record_batch::RecordBatch; use arrow_deps::{ arrow, datafusion::logical_plan::Expr as DfExpr, @@ -129,7 +129,7 @@ impl RowGroup { } /// The logical data-type of each column. - pub fn column_logical_types(&self) -> &BTreeMap { + pub fn column_logical_types(&self) -> &BTreeMap { &self.meta.column_types } diff --git a/read_buffer/src/schema.rs b/read_buffer/src/schema.rs index aa7186251d..9b56dcb609 100644 --- a/read_buffer/src/schema.rs +++ b/read_buffer/src/schema.rs @@ -1,6 +1,8 @@ use std::fmt::Display; -use crate::{column::LogicalDataType, AggregateType}; +use arrow_deps::arrow; + +use crate::AggregateType; /// A schema that is used to track the names and semantics of columns returned /// in results out of various operations on a row group. @@ -61,3 +63,27 @@ impl Display for ResultSchema { writeln!(f) } } + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +/// The logical data-type for a column. +pub enum LogicalDataType { + Integer, // Signed integer + Unsigned, // Unsigned integer + Float, // + String, // UTF-8 valid string + Binary, // Arbitrary collection of bytes + Boolean, // +} + +impl LogicalDataType { + pub fn to_arrow_datatype(&self) -> arrow::datatypes::DataType { + match &self { + LogicalDataType::Integer => arrow::datatypes::DataType::Int64, + LogicalDataType::Unsigned => arrow::datatypes::DataType::UInt64, + LogicalDataType::Float => arrow::datatypes::DataType::Float64, + LogicalDataType::String => arrow::datatypes::DataType::Utf8, + LogicalDataType::Binary => arrow::datatypes::DataType::Binary, + LogicalDataType::Boolean => arrow::datatypes::DataType::Boolean, + } + } +} diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index 6ff9a02aeb..7c8288a0cf 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -3,12 +3,8 @@ use std::fmt::Display; use std::slice::Iter; use crate::column::{AggregateResult, AggregateType, OwnedValue, Scalar, Value}; -use crate::schema::ResultSchema; -use crate::{ - column, - column::LogicalDataType, - row_group::{self, ColumnName, GroupKey, Predicate, RowGroup}, -}; +use crate::row_group::{self, ColumnName, GroupKey, Predicate, RowGroup}; +use crate::schema::{LogicalDataType, ResultSchema}; /// A Table represents data for a single measurement. /// @@ -97,7 +93,7 @@ impl Table { } /// The logical data-type of each column in the `Table`'s schema. - pub fn column_logical_types(&self) -> &BTreeMap { + pub fn column_logical_types(&self) -> &BTreeMap { &self.meta.column_types } @@ -427,7 +423,7 @@ struct MetaData { column_ranges: BTreeMap, // The `ReadBuffer` logical types associated with the columns in the table - column_types: BTreeMap, + column_types: BTreeMap, column_names: Vec, @@ -724,8 +720,9 @@ impl std::fmt::Display for DisplayReadAggregateResults<'_> { #[cfg(test)] mod test { use super::*; - use crate::column::{Column, LogicalDataType}; + use crate::column::Column; use crate::row_group::{BinaryExpr, ColumnType, ReadAggregateResult}; + use crate::schema::LogicalDataType; #[test] fn select() { From 17358589ed012656fad20c6cb5bc3d7e63fa171b Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Mon, 18 Jan 2021 12:11:10 +0000 Subject: [PATCH 02/11] refactor: move AggregateType to schema --- read_buffer/src/chunk.rs | 3 ++- read_buffer/src/column.rs | 35 +---------------------------------- read_buffer/src/lib.rs | 2 +- read_buffer/src/row_group.rs | 6 +++--- read_buffer/src/schema.rs | 35 +++++++++++++++++++++++++++++++++-- read_buffer/src/table.rs | 4 ++-- 6 files changed, 42 insertions(+), 43 deletions(-) diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index 943b448f04..01f614808b 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -1,10 +1,11 @@ use std::collections::{btree_map::Entry, BTreeMap, BTreeSet}; +use crate::row_group::RowGroup; use crate::row_group::{ColumnName, Predicate}; +use crate::schema::AggregateType; use crate::table; use crate::table::{ColumnSelection, Table}; use crate::Error; -use crate::{column::AggregateType, row_group::RowGroup}; type TableName = String; diff --git a/read_buffer/src/column.rs b/read_buffer/src/column.rs index 8ad9e45f9b..9677ab4124 100644 --- a/read_buffer/src/column.rs +++ b/read_buffer/src/column.rs @@ -13,7 +13,7 @@ use either::Either; use arrow_deps::{arrow, arrow::array::Array}; -use crate::schema::LogicalDataType; +use crate::schema::{AggregateType, LogicalDataType}; // Edd's totally made up magic constant. This determines whether we would use // a run-length encoded dictionary encoding or just a plain dictionary encoding. @@ -2180,39 +2180,6 @@ impl From<&arrow::array::Float64Array> for Column { } } -/// These variants describe supported aggregates that can applied to columnar -/// data. -#[derive(Copy, Clone, PartialEq, Debug)] -pub enum AggregateType { - Count, - First, - Last, - Min, - Max, - Sum, - /* TODO - support: - * Distinct - (edd): not sure this counts as an aggregations. Seems more like a special - * filter. CountDistinct - * Percentile */ -} - -impl std::fmt::Display for AggregateType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}", - match self { - AggregateType::Count => "count", - AggregateType::First => "first", - AggregateType::Last => "last", - AggregateType::Min => "min", - AggregateType::Max => "max", - AggregateType::Sum => "sum", - } - ) - } -} - /// These variants hold aggregates, which are the results of applying aggregates /// to column data. #[derive(Debug, Copy, Clone)] diff --git a/read_buffer/src/lib.rs b/read_buffer/src/lib.rs index 2e5f9fb1ab..8e2febc4ba 100644 --- a/read_buffer/src/lib.rs +++ b/read_buffer/src/lib.rs @@ -22,8 +22,8 @@ use arrow_deps::arrow::{ use snafu::{ResultExt, Snafu}; // Identifiers that are exported as part of the public API. -pub use column::AggregateType; pub use row_group::{BinaryExpr, Predicate}; +pub use schema::*; pub use table::ColumnSelection; use chunk::Chunk; diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index 6d0e95acde..7c5f2f04df 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -9,10 +9,10 @@ use hashbrown::{hash_map, HashMap}; use itertools::Itertools; use crate::column::{ - cmp::Operator, AggregateResult, AggregateType, Column, EncodedValues, OwnedValue, RowIDs, - RowIDsOption, Scalar, Value, Values, ValuesIterator, + cmp::Operator, AggregateResult, Column, EncodedValues, OwnedValue, RowIDs, RowIDsOption, + Scalar, Value, Values, ValuesIterator, }; -use crate::schema::{LogicalDataType, ResultSchema}; +use crate::schema::{AggregateType, LogicalDataType, ResultSchema}; use arrow_deps::arrow::record_batch::RecordBatch; use arrow_deps::{ arrow, datafusion::logical_plan::Expr as DfExpr, diff --git a/read_buffer/src/schema.rs b/read_buffer/src/schema.rs index 9b56dcb609..d132998786 100644 --- a/read_buffer/src/schema.rs +++ b/read_buffer/src/schema.rs @@ -2,8 +2,6 @@ use std::fmt::Display; use arrow_deps::arrow; -use crate::AggregateType; - /// A schema that is used to track the names and semantics of columns returned /// in results out of various operations on a row group. /// @@ -87,3 +85,36 @@ impl LogicalDataType { } } } + +/// These variants describe supported aggregates that can applied to columnar +/// data in the Read Buffer. +#[derive(Copy, Clone, PartialEq, Debug)] +pub enum AggregateType { + Count, + First, + Last, + Min, + Max, + Sum, + /* TODO - support: + * Distinct - (edd): not sure this counts as an aggregations. Seems more like a special + * filter. CountDistinct + * Percentile */ +} + +impl std::fmt::Display for AggregateType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + AggregateType::Count => "count", + AggregateType::First => "first", + AggregateType::Last => "last", + AggregateType::Min => "min", + AggregateType::Max => "max", + AggregateType::Sum => "sum", + } + ) + } +} diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index 7c8288a0cf..689e61f767 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -2,9 +2,9 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Display; use std::slice::Iter; -use crate::column::{AggregateResult, AggregateType, OwnedValue, Scalar, Value}; +use crate::column::{AggregateResult, OwnedValue, Scalar, Value}; use crate::row_group::{self, ColumnName, GroupKey, Predicate, RowGroup}; -use crate::schema::{LogicalDataType, ResultSchema}; +use crate::schema::{AggregateType, LogicalDataType, ResultSchema}; /// A Table represents data for a single measurement. /// From 71fce96b3b93e0804326fde54eb30d2092e3d693 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Mon, 18 Jan 2021 13:20:31 +0000 Subject: [PATCH 03/11] feat: encapsulate semantic column type in result schema --- read_buffer/src/row_group.rs | 76 +++++++++++++++------- read_buffer/src/schema.rs | 69 +++++++++++++++++--- read_buffer/src/table.rs | 119 ++++++++++++++++++++++------------- 3 files changed, 187 insertions(+), 77 deletions(-) diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index 7c5f2f04df..25dc16191d 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -56,7 +56,8 @@ impl RowGroup { ColumnType::Tag(c) => { assert_eq!(c.num_rows(), rows); - meta.column_types.insert(name.clone(), c.logical_datatype()); + meta.column_data_types + .insert(name.clone(), c.logical_datatype()); meta.column_ranges .insert(name.clone(), c.column_range().unwrap()); all_columns_by_name.insert(name.clone(), all_columns.len()); @@ -66,7 +67,8 @@ impl RowGroup { ColumnType::Field(c) => { assert_eq!(c.num_rows(), rows); - meta.column_types.insert(name.clone(), c.logical_datatype()); + meta.column_data_types + .insert(name.clone(), c.logical_datatype()); meta.column_ranges .insert(name.clone(), c.column_range().unwrap()); all_columns_by_name.insert(name.clone(), all_columns.len()); @@ -85,7 +87,8 @@ impl RowGroup { Some((_, _)) => unreachable!("unexpected types for time range"), }; - meta.column_types.insert(name.clone(), c.logical_datatype()); + meta.column_data_types + .insert(name.clone(), c.logical_datatype()); meta.column_ranges .insert(name.clone(), c.column_range().unwrap()); @@ -98,7 +101,7 @@ impl RowGroup { // Meta data should have same columns for types and ranges. assert_eq!( - meta.column_types.keys().collect::>(), + meta.column_data_types.keys().collect::>(), meta.column_ranges.keys().collect::>(), ); @@ -128,11 +131,16 @@ impl RowGroup { &self.meta.column_ranges } - /// The logical data-type of each column. - pub fn column_logical_types(&self) -> &BTreeMap { + /// The semantic types of each column. + pub fn column_types(&self) -> &BTreeMap { &self.meta.column_types } + /// The logical data-types of each column. + pub fn column_logical_types(&self) -> &BTreeMap { + &self.meta.column_data_types + } + // Returns a reference to a column from the column name. // // It is the caller's responsibility to ensure the column exists in the read @@ -415,8 +423,8 @@ impl RowGroup { // Materialise values in aggregate columns. let mut aggregate_columns_data = Vec::with_capacity(agg_cols_num); - for (name, agg_type, _) in &result.schema.aggregate_columns { - let col = self.column_by_name(name); + for (col_type, agg_type, _) in &result.schema.aggregate_columns { + let col = self.column_by_name(col_type.as_str()); // TODO(edd): this materialises a column per aggregate. If there are // multiple aggregates for the same column then this will @@ -644,13 +652,17 @@ impl RowGroup { .schema .aggregate_columns .iter() - .map(|(name, typ, _)| (self.column_by_name(name), *typ)) + .map(|(col_type, agg_type, _)| (self.column_by_name(col_type.as_str()), *agg_type)) .collect::>(); let encoded_groups = dst .schema .group_column_names_iter() - .map(|name| self.column_by_name(name).grouped_row_ids().unwrap_left()) + .map(|col_type| { + self.column_by_name(col_type.as_str()) + .grouped_row_ids() + .unwrap_left() + }) .collect::>(); // multi_cartesian_product will create the cartesian product of all @@ -1210,8 +1222,11 @@ struct MetaData { // possibly match based on the range of values a column has. column_ranges: BTreeMap, - // The logical column types for each column in the `RowGroup`. - column_types: BTreeMap, + // The semantic type of the columns in the row group + column_types: BTreeMap, + + // The logical data type of the columns in the row group. + column_data_types: BTreeMap, // The total time range of this table spanning all of the `RowGroup`s within // the table. @@ -1261,10 +1276,17 @@ impl MetaData { } // Extract schema information for a set of columns. - fn schema_for_column_names(&self, names: &[ColumnName<'_>]) -> Vec<(String, LogicalDataType)> { + fn schema_for_column_names( + &self, + names: &[ColumnName<'_>], + ) -> Vec<(crate::schema::ColumnType, LogicalDataType)> { names .iter() - .map(|&name| (name.to_owned(), *self.column_types.get(name).unwrap())) + .map(|&name| { + let col_type = self.column_types.get(name).unwrap(); + let data_type = self.column_data_types.get(name).unwrap(); + (*col_type.clone(), *data_type) + }) .collect::>() } @@ -1272,15 +1294,14 @@ impl MetaData { fn schema_for_aggregate_column_names( &self, columns: &[(ColumnName<'_>, AggregateType)], - ) -> Vec<(String, AggregateType, LogicalDataType)> { + ) -> Vec<(crate::schema::ColumnType, AggregateType, LogicalDataType)> { columns .iter() .map(|(name, agg_type)| { - ( - name.to_string(), - *agg_type, - *self.column_types.get(*name).unwrap(), - ) + let col_type = self.column_types.get(*name).unwrap(); + let data_type = self.column_data_types.get(*name).unwrap(); + + (*col_type.clone(), *agg_type, *data_type) }) .collect::>() } @@ -1466,6 +1487,7 @@ impl std::fmt::Display for &ReadAggregateResult<'_> { #[cfg(test)] mod test { use super::*; + use crate::schema; // Helper function that creates a predicate from a single expression fn col_pred(expr: BinaryExpr) -> Predicate { @@ -1951,17 +1973,23 @@ west,POST,304,101,203 schema: ResultSchema { select_columns: vec![], group_columns: vec![ - ("region".to_owned(), LogicalDataType::String), - ("host".to_owned(), LogicalDataType::String), + ( + schema::ColumnType::Tag("region".to_owned()), + LogicalDataType::String, + ), + ( + schema::ColumnType::Tag("host".to_owned()), + LogicalDataType::String, + ), ], aggregate_columns: vec![ ( - "temp".to_owned(), + schema::ColumnType::Field("temp".to_owned()), AggregateType::Sum, LogicalDataType::Integer, ), ( - "voltage".to_owned(), + schema::ColumnType::Field("voltage".to_owned()), AggregateType::Count, LogicalDataType::Unsigned, ), diff --git a/read_buffer/src/schema.rs b/read_buffer/src/schema.rs index d132998786..1736838f18 100644 --- a/read_buffer/src/schema.rs +++ b/read_buffer/src/schema.rs @@ -10,24 +10,39 @@ use arrow_deps::arrow; /// the read buffer. #[derive(Default, PartialEq, Debug)] pub struct ResultSchema { - pub select_columns: Vec<(String, LogicalDataType)>, - pub group_columns: Vec<(String, LogicalDataType)>, - pub aggregate_columns: Vec<(String, AggregateType, LogicalDataType)>, + pub select_columns: Vec<(ColumnType, LogicalDataType)>, + pub group_columns: Vec<(ColumnType, LogicalDataType)>, + pub aggregate_columns: Vec<(ColumnType, AggregateType, LogicalDataType)>, } impl ResultSchema { - pub fn select_column_names_iter(&self) -> impl Iterator { - self.select_columns.iter().map(|(name, _)| name.as_str()) + pub fn select_column_names_iter(&self) -> impl Iterator { + self.select_columns.iter().map(|(name, _)| match name { + ColumnType::Tag(name) => name, + ColumnType::Field(name) => name, + ColumnType::Timestamp(name) => name, + ColumnType::Other(name) => name, + }) } - pub fn group_column_names_iter(&self) -> impl Iterator { - self.group_columns.iter().map(|(name, _)| name.as_str()) + pub fn group_column_names_iter(&self) -> impl Iterator { + self.group_columns.iter().map(|(name, _)| match name { + ColumnType::Tag(name) => name, + ColumnType::Field(name) => name, + ColumnType::Timestamp(name) => name, + ColumnType::Other(name) => name, + }) } - pub fn aggregate_column_names_iter(&self) -> impl Iterator { + pub fn aggregate_column_names_iter(&self) -> impl Iterator { self.aggregate_columns .iter() - .map(|(name, _, _)| name.as_str()) + .map(|(name, _, _)| match name { + ColumnType::Tag(name) => name, + ColumnType::Field(name) => name, + ColumnType::Timestamp(name) => name, + ColumnType::Other(name) => name, + }) } } @@ -118,3 +133,39 @@ impl std::fmt::Display for AggregateType { ) } } + +/// Describes the semantic meaning of the column in a set of results. That is, +/// whether the column is a "tag", "field", "timestamp", or "other". +#[derive(PartialEq, Debug, PartialOrd)] +pub enum ColumnType { + Tag(String), + Field(String), + Timestamp(String), + Other(String), +} + +impl ColumnType { + pub fn as_str(&self) -> &str { + match self { + ColumnType::Tag(name) => name.as_str(), + ColumnType::Field(name) => name.as_str(), + ColumnType::Timestamp(name) => name.as_str(), + ColumnType::Other(name) => name.as_str(), + } + } +} + +impl Display for ColumnType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + ColumnType::Tag(name) => name, + ColumnType::Field(name) => name, + ColumnType::Timestamp(name) => name, + ColumnType::Other(name) => name, + } + ) + } +} diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index 689e61f767..9445bb9c43 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -4,7 +4,7 @@ use std::slice::Iter; use crate::column::{AggregateResult, OwnedValue, Scalar, Value}; use crate::row_group::{self, ColumnName, GroupKey, Predicate, RowGroup}; -use crate::schema::{AggregateType, LogicalDataType, ResultSchema}; +use crate::schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema}; /// A Table represents data for a single measurement. /// @@ -94,7 +94,7 @@ impl Table { /// The logical data-type of each column in the `Table`'s schema. pub fn column_logical_types(&self) -> &BTreeMap { - &self.meta.column_types + &self.meta.column_data_types } // Identify set of row groups that might satisfy the predicate. @@ -134,7 +134,7 @@ impl Table { let rgs = self.filter_row_groups(predicate); let schema = match columns { - ColumnSelection::All => self.meta.schema(), + ColumnSelection::All => self.meta.schema_for_all_columns(), ColumnSelection::Some(column_names) => self.meta.schema_for_column_names(column_names), }; @@ -422,8 +422,11 @@ struct MetaData { // possibly match based on the range of values a column has. column_ranges: BTreeMap, - // The `ReadBuffer` logical types associated with the columns in the table - column_types: BTreeMap, + // The semantic type of all columns in the table. + column_types: BTreeMap, + + // The logical data type of all columns in the table. + column_data_types: BTreeMap, column_names: Vec, @@ -440,16 +443,9 @@ impl MetaData { Self { size: rg.size(), rows: u64::from(rg.rows()), - column_ranges: rg - .column_ranges() - .iter() - .map(|(k, v)| (k.to_string(), (v.0.clone(), v.1.clone()))) - .collect(), - column_types: rg - .column_logical_types() - .iter() - .map(|(k, v)| (k.clone(), *v)) - .collect(), + column_ranges: rg.column_ranges().clone(), + column_types: *rg.column_types().clone(), + column_data_types: rg.column_logical_types().clone(), column_names: rg.column_ranges().keys().cloned().collect(), time_range: Some(rg.time_range()), } @@ -458,37 +454,50 @@ impl MetaData { // Extract schema information for a set of columns. If a column name does // not exist within the `Table` schema it is ignored and not present within // the resulting schema information. - fn schema_for_column_names(&self, names: &[ColumnName<'_>]) -> Vec<(String, LogicalDataType)> { + fn schema_for_column_names( + &self, + names: &[ColumnName<'_>], + ) -> Vec<(ColumnType, LogicalDataType)> { names .iter() - .filter_map(|&name| match self.column_types.get_key_value(name) { - Some((name, data_type)) => Some((name.clone(), *data_type)), + .filter_map(|&name| match self.column_types.get(name) { + Some(column_type) => Some(( + *column_type.clone(), + *self.column_data_types.get(name).unwrap(), + )), None => None, }) .collect::>() } + // As `schema_for_column_names` but for all columns in the table. + fn schema_for_all_columns(&self) -> Vec<(ColumnType, LogicalDataType)> { + self.column_types + .iter() + .map(|(name, column_type)| { + ( + *column_type.clone(), + *self.column_data_types.get(name).unwrap(), + ) + }) + .collect::>() + } + // As `schema_for_column_names` but also embeds the provided aggregate type. fn schema_for_aggregate_column_names( &self, names: &[(ColumnName<'_>, AggregateType)], - ) -> Vec<(String, AggregateType, LogicalDataType)> { + ) -> Vec<(ColumnType, AggregateType, LogicalDataType)> { names .iter() - .filter_map( - |(name, agg_type)| match self.column_types.get_key_value(*name) { - Some((name, data_type)) => Some((name.clone(), *agg_type, *data_type)), - None => None, - }, - ) - .collect::>() - } - - // Extract all schema information for the `Table`. - fn schema(&self) -> Vec<(String, LogicalDataType)> { - self.column_types - .iter() - .map(|(k, v)| (k.clone(), *v)) + .filter_map(|(name, agg_type)| match self.column_types.get(*name) { + Some(column_type) => Some(( + *column_type.clone(), + *agg_type, + *self.column_data_types.get(*name).unwrap(), + )), + None => None, + }) .collect::>() } @@ -501,9 +510,10 @@ impl MetaData { self.size += rg.size(); self.rows += u64::from(rg.rows()); - // The incoming row group must have the same schema as the existing row - // groups in the table. - assert_eq!(&self.column_types, rg.column_logical_types()); + // The incoming row group must have exactly the same schema as the + // existing row groups in the table. + assert_eq!(&self.column_types, rg.column_types()); + assert_eq!(&self.column_data_types, rg.column_logical_types()); assert_eq!(self.column_ranges.len(), rg.column_ranges().len()); for (column_name, (column_range_min, column_range_max)) in rg.column_ranges() { @@ -659,7 +669,11 @@ impl<'a> Iterator for ReadAggregateResults<'a> { let merged_results = self.row_groups.remove(0).read_aggregate( &self.predicate, - &self.schema.group_column_names_iter().collect::>(), + &self + .schema + .group_column_names_iter() + .map(|s| s.as_str()) + .collect::>(), &self .schema .aggregate_columns @@ -674,7 +688,11 @@ impl<'a> Iterator for ReadAggregateResults<'a> { for row_group in &self.row_groups { let result = row_group.read_aggregate( &self.predicate, - &self.schema.group_column_names_iter().collect::>(), + &self + .schema + .group_column_names_iter() + .map(|s| s.as_str()) + .collect::>(), &self .schema .aggregate_columns @@ -722,6 +740,7 @@ mod test { use super::*; use crate::column::Column; use crate::row_group::{BinaryExpr, ColumnType, ReadAggregateResult}; + use crate::schema; use crate::schema::LogicalDataType; #[test] @@ -829,11 +848,17 @@ mod test { schema: ResultSchema { select_columns: vec![], group_columns: vec![ - ("region".to_owned(), LogicalDataType::String), - ("host".to_owned(), LogicalDataType::String), + ( + schema::ColumnType::Tag("region".to_owned()), + LogicalDataType::String, + ), + ( + schema::ColumnType::Tag("host".to_owned()), + LogicalDataType::String, + ), ], aggregate_columns: vec![( - "temp".to_owned(), + schema::ColumnType::Tag("temp".to_owned()), AggregateType::Sum, LogicalDataType::Integer, )], @@ -845,11 +870,17 @@ mod test { schema: ResultSchema { select_columns: vec![], group_columns: vec![ - ("region".to_owned(), LogicalDataType::String), - ("host".to_owned(), LogicalDataType::String), + ( + schema::ColumnType::Tag("region".to_owned()), + LogicalDataType::String, + ), + ( + schema::ColumnType::Tag("host".to_owned()), + LogicalDataType::String, + ), ], aggregate_columns: vec![( - "temp".to_owned(), + schema::ColumnType::Tag("temp".to_owned()), AggregateType::Sum, LogicalDataType::Integer, )], From 864e9e4dac1d21cb189bccab370be923b884eed7 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Mon, 18 Jan 2021 13:21:39 +0000 Subject: [PATCH 04/11] refactor: tidy up columns in row group --- read_buffer/src/row_group.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index 25dc16191d..3a2461d70d 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -32,8 +32,6 @@ pub struct RowGroup { columns: Vec, all_columns_by_name: BTreeMap, - tag_columns_by_name: BTreeMap, - field_columns_by_name: BTreeMap, time_column: usize, } @@ -46,8 +44,6 @@ impl RowGroup { let mut all_columns = vec![]; let mut all_columns_by_name = BTreeMap::new(); - let mut tag_columns_by_name = BTreeMap::new(); - let mut field_columns_by_name = BTreeMap::new(); let mut time_column = None; for (name, ct) in columns { @@ -61,7 +57,6 @@ impl RowGroup { meta.column_ranges .insert(name.clone(), c.column_range().unwrap()); all_columns_by_name.insert(name.clone(), all_columns.len()); - tag_columns_by_name.insert(name, all_columns.len()); all_columns.push(c); } ColumnType::Field(c) => { @@ -72,7 +67,6 @@ impl RowGroup { meta.column_ranges .insert(name.clone(), c.column_range().unwrap()); all_columns_by_name.insert(name.clone(), all_columns.len()); - field_columns_by_name.insert(name, all_columns.len()); all_columns.push(c); } ColumnType::Time(c) => { @@ -109,8 +103,6 @@ impl RowGroup { meta, columns: all_columns, all_columns_by_name, - tag_columns_by_name, - field_columns_by_name, time_column: time_column.unwrap(), } } From 93f4f8aa4135384f8437d2e06d8a80b70c9db61b Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Mon, 18 Jan 2021 16:22:19 +0000 Subject: [PATCH 05/11] feat: teach read_buffer schema -> data_types schema --- read_buffer/src/schema.rs | 46 ++++++++++++++++++++++++++++++++++----- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/read_buffer/src/schema.rs b/read_buffer/src/schema.rs index 1736838f18..1d7eec7bb0 100644 --- a/read_buffer/src/schema.rs +++ b/read_buffer/src/schema.rs @@ -1,6 +1,7 @@ -use std::fmt::Display; +use std::{convert::TryFrom, fmt::Display}; use arrow_deps::arrow; +use data_types::schema::InfluxFieldType; /// A schema that is used to track the names and semantics of columns returned /// in results out of various operations on a row group. @@ -77,6 +78,26 @@ impl Display for ResultSchema { } } +impl TryFrom<&ResultSchema> for data_types::schema::Schema { + type Error = data_types::schema::builder::Error; + + fn try_from(rs: &ResultSchema) -> Result { + let mut builder = data_types::schema::builder::SchemaBuilder::new(); + for (col_type, data_type) in &rs.select_columns { + match col_type { + ColumnType::Tag(name) => builder = builder.tag(name.as_str()), + ColumnType::Field(name) => { + builder = builder.influx_field(name.as_str(), data_type.into()) + } + ColumnType::Timestamp(_) => builder = builder.timestamp(), + ColumnType::Other(name) => builder = builder.field(name.as_str(), data_type.into()), + } + } + + builder.build() + } +} + #[derive(Debug, Copy, Clone, Eq, PartialEq)] /// The logical data-type for a column. pub enum LogicalDataType { @@ -88,9 +109,9 @@ pub enum LogicalDataType { Boolean, // } -impl LogicalDataType { - pub fn to_arrow_datatype(&self) -> arrow::datatypes::DataType { - match &self { +impl From<&LogicalDataType> for arrow::datatypes::DataType { + fn from(logical_type: &LogicalDataType) -> Self { + match logical_type { LogicalDataType::Integer => arrow::datatypes::DataType::Int64, LogicalDataType::Unsigned => arrow::datatypes::DataType::UInt64, LogicalDataType::Float => arrow::datatypes::DataType::Float64, @@ -101,6 +122,21 @@ impl LogicalDataType { } } +impl From<&LogicalDataType> for InfluxFieldType { + fn from(logical_type: &LogicalDataType) -> Self { + match logical_type { + LogicalDataType::Integer => InfluxFieldType::Integer, + LogicalDataType::Unsigned => InfluxFieldType::UInteger, + LogicalDataType::Float => InfluxFieldType::Float, + LogicalDataType::String => InfluxFieldType::String, + LogicalDataType::Binary => { + unimplemented!("binary data type cannot be represented as InfluxFieldType") + } + LogicalDataType::Boolean => InfluxFieldType::Boolean, + } + } +} + /// These variants describe supported aggregates that can applied to columnar /// data in the Read Buffer. #[derive(Copy, Clone, PartialEq, Debug)] @@ -136,7 +172,7 @@ impl std::fmt::Display for AggregateType { /// Describes the semantic meaning of the column in a set of results. That is, /// whether the column is a "tag", "field", "timestamp", or "other". -#[derive(PartialEq, Debug, PartialOrd)] +#[derive(PartialEq, Debug, PartialOrd, Clone)] pub enum ColumnType { Tag(String), Field(String), From 0c7424465d96813ae1d71883e0b03cbc9e3490b2 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Mon, 18 Jan 2021 16:22:34 +0000 Subject: [PATCH 06/11] refactor: use schema type for read_filter --- read_buffer/src/lib.rs | 11 +++- read_buffer/src/row_group.rs | 106 +++++++++++++++++------------------ read_buffer/src/table.rs | 77 +++++++++++++------------ 3 files changed, 102 insertions(+), 92 deletions(-) diff --git a/read_buffer/src/lib.rs b/read_buffer/src/lib.rs index 8e2febc4ba..490ad54351 100644 --- a/read_buffer/src/lib.rs +++ b/read_buffer/src/lib.rs @@ -10,6 +10,7 @@ pub(crate) mod table; use std::{ collections::{btree_map::Entry, BTreeMap, BTreeSet}, + convert::TryInto, fmt, sync::Arc, }; @@ -52,6 +53,11 @@ pub enum Error { #[snafu(display("unsupported operation: {}", msg))] UnsupportedOperation { msg: String }, + + #[snafu(display("schema conversion error: {}", source))] + SchemaError { + source: data_types::schema::builder::Error, + }, } pub type Result = std::result::Result; @@ -524,7 +530,10 @@ impl<'input, 'chunk> Iterator for ReadFilterResults<'input, 'chunk> { Some(table_results) => { // Table has found results in a row group. if let Some(row_group_result) = table_results.next() { - return Some(row_group_result.record_batch()); + // it should not be possible for the conversion to record + // batch to fail here + let rb = row_group_result.try_into(); + return Some(rb.unwrap()); } // no more results for row groups in the table. Try next chunk. diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index 3a2461d70d..84637dcf8f 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -2,7 +2,6 @@ use std::{ borrow::Cow, collections::BTreeMap, convert::{TryFrom, TryInto}, - sync::Arc, }; use hashbrown::{hash_map, HashMap}; @@ -12,6 +11,7 @@ use crate::column::{ cmp::Operator, AggregateResult, Column, EncodedValues, OwnedValue, RowIDs, RowIDsOption, Scalar, Value, Values, ValuesIterator, }; +use crate::schema; use crate::schema::{AggregateType, LogicalDataType, ResultSchema}; use arrow_deps::arrow::record_batch::RecordBatch; use arrow_deps::{ @@ -52,20 +52,24 @@ impl RowGroup { ColumnType::Tag(c) => { assert_eq!(c.num_rows(), rows); - meta.column_data_types - .insert(name.clone(), c.logical_datatype()); meta.column_ranges .insert(name.clone(), c.column_range().unwrap()); + meta.column_data_types + .insert(name.clone(), c.logical_datatype()); + meta.column_types + .insert(name.clone(), schema::ColumnType::Tag(name.clone())); all_columns_by_name.insert(name.clone(), all_columns.len()); all_columns.push(c); } ColumnType::Field(c) => { assert_eq!(c.num_rows(), rows); - meta.column_data_types - .insert(name.clone(), c.logical_datatype()); meta.column_ranges .insert(name.clone(), c.column_range().unwrap()); + meta.column_data_types + .insert(name.clone(), c.logical_datatype()); + meta.column_types + .insert(name.clone(), schema::ColumnType::Field(name.clone())); all_columns_by_name.insert(name.clone(), all_columns.len()); all_columns.push(c); } @@ -81,10 +85,12 @@ impl RowGroup { Some((_, _)) => unreachable!("unexpected types for time range"), }; - meta.column_data_types - .insert(name.clone(), c.logical_datatype()); meta.column_ranges .insert(name.clone(), c.column_range().unwrap()); + meta.column_data_types + .insert(name.clone(), c.logical_datatype()); + meta.column_types + .insert(name.clone(), schema::ColumnType::Timestamp(name.clone())); all_columns_by_name.insert(name.clone(), all_columns.len()); time_column = Some(all_columns.len()); @@ -186,37 +192,37 @@ impl RowGroup { columns: &[ColumnName<'_>], predicates: &Predicate, ) -> ReadFilterResult<'_> { - let row_ids = self.row_ids_from_predicates(predicates); + let select_columns = self.meta.schema_for_column_names(&columns); + assert_eq!(select_columns.len(), columns.len()); - // ensure meta/data have same lifetime by using column names from row - // group rather than from input. - let (col_names, col_data) = self.materialise_rows(columns, row_ids); - let schema = self.meta.schema_for_column_names(&col_names); + let schema = ResultSchema { + select_columns, + group_columns: vec![], + aggregate_columns: vec![], + }; + + // apply predicates to determine candidate rows. + let row_ids = self.row_ids_from_predicates(predicates); + let col_data = self.materialise_rows(columns, row_ids); ReadFilterResult { schema, data: col_data, } } - fn materialise_rows( - &self, - names: &[ColumnName<'_>], - row_ids: RowIDsOption, - ) -> (Vec<&str>, Vec>) { - let mut col_names = Vec::with_capacity(names.len()); + fn materialise_rows(&self, names: &[ColumnName<'_>], row_ids: RowIDsOption) -> Vec> { let mut col_data = Vec::with_capacity(names.len()); match row_ids { - RowIDsOption::None(_) => (col_names, col_data), // nothing to materialise + RowIDsOption::None(_) => col_data, // nothing to materialise RowIDsOption::Some(row_ids) => { // TODO(edd): causes an allocation. Implement a way to pass a // pooled buffer to the croaring Bitmap API. let row_ids = row_ids.to_vec(); for &name in names { - let (col_name, col) = self.column_name_and_column(name); - col_names.push(col_name); + let (_, col) = self.column_name_and_column(name); col_data.push(col.values(row_ids.as_slice())); } - (col_names, col_data) + col_data } RowIDsOption::All(_) => { @@ -226,11 +232,10 @@ impl RowGroup { let row_ids = (0..self.rows()).collect::>(); for &name in names { - let (col_name, col) = self.column_name_and_column(name); - col_names.push(col_name); + let (_, col) = self.column_name_and_column(name); col_data.push(col.values(row_ids.as_slice())); } - (col_names, col_data) + col_data } } } @@ -1277,7 +1282,7 @@ impl MetaData { .map(|&name| { let col_type = self.column_types.get(name).unwrap(); let data_type = self.column_data_types.get(name).unwrap(); - (*col_type.clone(), *data_type) + (col_type.clone(), *data_type) }) .collect::>() } @@ -1293,7 +1298,7 @@ impl MetaData { let col_type = self.column_types.get(*name).unwrap(); let data_type = self.column_data_types.get(*name).unwrap(); - (*col_type.clone(), *agg_type, *data_type) + (col_type.clone(), *agg_type, *data_type) }) .collect::>() } @@ -1302,8 +1307,7 @@ impl MetaData { /// Encapsulates results from `RowGroup`s with a structure that makes them /// easier to work with and display. pub struct ReadFilterResult<'row_group> { - /// tuples of the form (column_name, data_type) - schema: Vec<(String, LogicalDataType)>, + schema: ResultSchema, data: Vec>, } @@ -1312,23 +1316,20 @@ impl ReadFilterResult<'_> { self.data.is_empty() } - pub fn schema(&self) -> &Vec<(String, LogicalDataType)> { + pub fn schema(&self) -> &ResultSchema { &self.schema } +} - /// Produces a `RecordBatch` from the results, giving up ownership to the - /// returned record batch. - pub fn record_batch(self) -> arrow::record_batch::RecordBatch { - let schema = arrow::datatypes::Schema::new( - self.schema() - .iter() - .map(|(col_name, col_typ)| { - arrow::datatypes::Field::new(col_name, col_typ.to_arrow_datatype(), true) - }) - .collect::>(), - ); +impl TryFrom> for RecordBatch { + type Error = crate::Error; - let columns = self + fn try_from(result: ReadFilterResult<'_>) -> Result { + let schema = data_types::schema::Schema::try_from(result.schema()) + .map_err(|source| crate::Error::SchemaError { source })?; + let arrow_schema: arrow_deps::arrow::datatypes::SchemaRef = schema.into(); + + let columns = result .data .into_iter() .map(arrow::array::ArrayRef::from) @@ -1337,21 +1338,15 @@ impl ReadFilterResult<'_> { // try_new only returns an error if the schema is invalid or the number // of rows on columns differ. We have full control over both so there // should never be an error to return... - arrow::record_batch::RecordBatch::try_new(Arc::new(schema), columns).unwrap() + arrow::record_batch::RecordBatch::try_new(arrow_schema, columns) + .map_err(|source| crate::Error::ArrowError { source }) } } impl std::fmt::Debug for &ReadFilterResult<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - // header line. - for (i, (k, _)) in self.schema.iter().enumerate() { - write!(f, "{}", k)?; - - if i < self.schema.len() - 1 { - write!(f, ",")?; - } - } - writeln!(f)?; + // Display the header + std::fmt::Display::fmt(self.schema(), f)?; // Display the rest of the values. std::fmt::Display::fmt(&self, f) @@ -1373,14 +1368,15 @@ impl std::fmt::Display for &ReadFilterResult<'_> { .map(|v| ValuesIterator::new(v)) .collect::>(); + let columns = iter_map.len(); while rows < expected_rows { if rows > 0 { writeln!(f)?; } - for (i, (k, _)) in self.schema.iter().enumerate() { - write!(f, "{}", iter_map[i].next().unwrap())?; - if i < self.schema.len() - 1 { + for (i, data) in iter_map.iter_mut().enumerate() { + write!(f, "{}", data.next().unwrap())?; + if i < columns - 1 { write!(f, ",")?; } } diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index 9445bb9c43..5f39d74d6a 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -133,18 +133,18 @@ impl Table { // and merge results. let rgs = self.filter_row_groups(predicate); - let schema = match columns { - ColumnSelection::All => self.meta.schema_for_all_columns(), - ColumnSelection::Some(column_names) => self.meta.schema_for_column_names(column_names), + let schema = ResultSchema { + select_columns: match columns { + ColumnSelection::All => self.meta.schema_for_all_columns(), + ColumnSelection::Some(column_names) => { + self.meta.schema_for_column_names(column_names) + } + }, + ..ResultSchema::default() }; - // temp I think I can remove `columns` and `predicates` from this.. - let columns = schema - .iter() - .map(|(name, _)| name.to_string()) - .collect::>(); + // temp I think I can remove `predicates` from the results ReadFilterResults { - columns, predicate: predicate.clone(), schema, row_groups: rgs, @@ -444,7 +444,7 @@ impl MetaData { size: rg.size(), rows: u64::from(rg.rows()), column_ranges: rg.column_ranges().clone(), - column_types: *rg.column_types().clone(), + column_types: rg.column_types().clone(), column_data_types: rg.column_logical_types().clone(), column_names: rg.column_ranges().keys().cloned().collect(), time_range: Some(rg.time_range()), @@ -462,7 +462,7 @@ impl MetaData { .iter() .filter_map(|&name| match self.column_types.get(name) { Some(column_type) => Some(( - *column_type.clone(), + column_type.clone(), *self.column_data_types.get(name).unwrap(), )), None => None, @@ -476,7 +476,7 @@ impl MetaData { .iter() .map(|(name, column_type)| { ( - *column_type.clone(), + column_type.clone(), *self.column_data_types.get(name).unwrap(), ) }) @@ -492,7 +492,7 @@ impl MetaData { .iter() .filter_map(|(name, agg_type)| match self.column_types.get(*name) { Some(column_type) => Some(( - *column_type.clone(), + column_type.clone(), *agg_type, *self.column_data_types.get(*name).unwrap(), )), @@ -552,14 +552,13 @@ pub enum ColumnSelection<'a> { /// row groups are only queried when `ReadFilterResults` is iterated. pub struct ReadFilterResults<'table> { // schema of all columns in the query results - schema: Vec<(String, LogicalDataType)>, + schema: ResultSchema, // These row groups passed the predicates and need to be queried. row_groups: Vec<&'table RowGroup>, - // TODO(edd): encapsulate these into a single executor function that just + // TODO(edd): encapsulate this into a single executor function that just // executes on the next row group. - columns: Vec, predicate: Predicate, } @@ -570,7 +569,7 @@ impl<'table> ReadFilterResults<'table> { /// Returns the schema associated with table result and therefore all of the /// results for all of row groups in the table results. - pub fn schema(&self) -> &Vec<(String, LogicalDataType)> { + pub fn schema(&self) -> &ResultSchema { &self.schema } } @@ -586,8 +585,8 @@ impl<'a> Iterator for ReadFilterResults<'a> { let row_group = self.row_groups.remove(0); let result = row_group.read_filter( &self - .columns - .iter() + .schema() + .select_column_names_iter() .map(|name| name.as_str()) .collect::>(), &self.predicate, @@ -610,21 +609,14 @@ impl<'a> Display for DisplayReadFilterResults<'a> { return Ok(()); } - let schema = self.0[0].schema(); - // header line. - for (i, (k, _)) in schema.iter().enumerate() { - write!(f, "{}", k)?; + // write out the schema of the first result as the table header + std::fmt::Display::fmt(&self.0[0].schema(), f)?; - if i < schema.len() - 1 { - write!(f, ",")?; - } + // write out each row group result + for row_group in self.0.iter() { + std::fmt::Display::fmt(&row_group, f)?; } - writeln!(f)?; - // Display all the results of each row group - for row_group in &self.0 { - row_group.fmt(f)?; - } Ok(()) } } @@ -789,11 +781,24 @@ mod test { ); // check the column types - let exp_schema = vec![ - ("time".to_owned(), LogicalDataType::Integer), - ("count".to_owned(), LogicalDataType::Unsigned), - ("region".to_owned(), LogicalDataType::String), - ]; + let exp_schema = ResultSchema { + select_columns: vec![ + ( + schema::ColumnType::Timestamp("time".to_owned()), + LogicalDataType::Integer, + ), + ( + schema::ColumnType::Field("count".to_owned()), + LogicalDataType::Unsigned, + ), + ( + schema::ColumnType::Tag("region".to_owned()), + LogicalDataType::String, + ), + ], + ..ResultSchema::default() + }; + assert_eq!(results.schema(), &exp_schema); let mut all = vec![]; From 9f65c4b6ef5965a64f30403954d0e1fd508063c4 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Mon, 18 Jan 2021 17:53:06 +0000 Subject: [PATCH 07/11] refactor: encapsulate column meta data --- read_buffer/src/row_group.rs | 156 ++++++++++++++++++----------- read_buffer/src/table.rs | 185 ++++++++++++++++++++++------------- 2 files changed, 220 insertions(+), 121 deletions(-) diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index 84637dcf8f..e6e31339b2 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -52,24 +52,25 @@ impl RowGroup { ColumnType::Tag(c) => { assert_eq!(c.num_rows(), rows); - meta.column_ranges - .insert(name.clone(), c.column_range().unwrap()); - meta.column_data_types - .insert(name.clone(), c.logical_datatype()); - meta.column_types - .insert(name.clone(), schema::ColumnType::Tag(name.clone())); + meta.add_column( + &name, + schema::ColumnType::Tag(name.clone()), + c.logical_datatype(), + c.column_range().unwrap(), + ); + all_columns_by_name.insert(name.clone(), all_columns.len()); all_columns.push(c); } ColumnType::Field(c) => { assert_eq!(c.num_rows(), rows); - meta.column_ranges - .insert(name.clone(), c.column_range().unwrap()); - meta.column_data_types - .insert(name.clone(), c.logical_datatype()); - meta.column_types - .insert(name.clone(), schema::ColumnType::Field(name.clone())); + meta.add_column( + &name, + schema::ColumnType::Field(name.clone()), + c.logical_datatype(), + c.column_range().unwrap(), + ); all_columns_by_name.insert(name.clone(), all_columns.len()); all_columns.push(c); } @@ -85,12 +86,12 @@ impl RowGroup { Some((_, _)) => unreachable!("unexpected types for time range"), }; - meta.column_ranges - .insert(name.clone(), c.column_range().unwrap()); - meta.column_data_types - .insert(name.clone(), c.logical_datatype()); - meta.column_types - .insert(name.clone(), schema::ColumnType::Timestamp(name.clone())); + meta.add_column( + &name, + schema::ColumnType::Timestamp(name.clone()), + c.logical_datatype(), + c.column_range().unwrap(), + ); all_columns_by_name.insert(name.clone(), all_columns.len()); time_column = Some(all_columns.len()); @@ -100,10 +101,7 @@ impl RowGroup { } // Meta data should have same columns for types and ranges. - assert_eq!( - meta.column_data_types.keys().collect::>(), - meta.column_ranges.keys().collect::>(), - ); + assert_eq!(meta.columns.keys().len(), all_columns.len()); Self { meta, @@ -124,19 +122,9 @@ impl RowGroup { self.meta.rows } - /// The ranges on each column in the `RowGroup`. - pub fn column_ranges(&self) -> &BTreeMap { - &self.meta.column_ranges - } - - /// The semantic types of each column. - pub fn column_types(&self) -> &BTreeMap { - &self.meta.column_types - } - - /// The logical data-types of each column. - pub fn column_logical_types(&self) -> &BTreeMap { - &self.meta.column_data_types + // The row group's meta data. + pub fn metadata(&self) -> &MetaData { + &self.meta } // Returns a reference to a column from the column name. @@ -1203,13 +1191,28 @@ impl ColumnType { } } +#[derive(Debug, Clone)] +pub struct ColumnMeta { + pub typ: crate::schema::ColumnType, + pub logical_data_type: LogicalDataType, + pub range: (OwnedValue, OwnedValue), +} + +// column metadata is equivalent for two columns if their logical type and +// semantic type are equivalent. +impl PartialEq for ColumnMeta { + fn eq(&self, other: &Self) -> bool { + self.typ == other.typ && self.logical_data_type == other.logical_data_type + } +} + #[derive(Default, Debug)] -struct MetaData { +pub struct MetaData { // The total size of the table in bytes. - size: u64, + pub size: u64, // The total number of rows in the table. - rows: u32, + pub rows: u32, // The distinct set of columns for this `RowGroup` (all of these columns // will appear in all of the `Table`'s `RowGroup`s) and the range of values @@ -1217,20 +1220,14 @@ struct MetaData { // // This can be used to skip the table entirely if a logical predicate can't // possibly match based on the range of values a column has. - column_ranges: BTreeMap, - - // The semantic type of the columns in the row group - column_types: BTreeMap, - - // The logical data type of the columns in the row group. - column_data_types: BTreeMap, + pub columns: BTreeMap, // The total time range of this table spanning all of the `RowGroup`s within // the table. // // This can be used to skip the table entirely if the time range for a query // falls outside of this range. - time_range: (i64, i64), + pub time_range: (i64, i64), } impl MetaData { @@ -1239,8 +1236,8 @@ impl MetaData { // no rows in the `RowGroup` would ever match the expression. // pub fn column_could_satisfy_binary_expr(&self, expr: &BinaryExpr) -> bool { - let (column_min, column_max) = match self.column_ranges.get(expr.column()) { - Some(range) => range, + let (column_min, column_max) = match self.columns.get(expr.column()) { + Some(schema) => &schema.range, None => return false, // column doesn't exist. }; @@ -1272,6 +1269,23 @@ impl MetaData { } } + pub fn add_column( + &mut self, + name: &str, + col_type: schema::ColumnType, + logical_data_type: LogicalDataType, + range: (OwnedValue, OwnedValue), + ) { + self.columns.insert( + name.to_owned(), + ColumnMeta { + typ: col_type, + logical_data_type, + range, + }, + ); + } + // Extract schema information for a set of columns. fn schema_for_column_names( &self, @@ -1280,9 +1294,8 @@ impl MetaData { names .iter() .map(|&name| { - let col_type = self.column_types.get(name).unwrap(); - let data_type = self.column_data_types.get(name).unwrap(); - (col_type.clone(), *data_type) + let schema = self.columns.get(name).unwrap(); + (schema.typ.clone(), schema.logical_data_type) }) .collect::>() } @@ -1295,10 +1308,8 @@ impl MetaData { columns .iter() .map(|(name, agg_type)| { - let col_type = self.column_types.get(*name).unwrap(); - let data_type = self.column_data_types.get(*name).unwrap(); - - (col_type.clone(), *agg_type, *data_type) + let schema = self.columns.get(*name).unwrap(); + (schema.typ.clone(), *agg_type, schema.logical_data_type) }) .collect::>() } @@ -1347,6 +1358,7 @@ impl std::fmt::Debug for &ReadFilterResult<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { // Display the header std::fmt::Display::fmt(self.schema(), f)?; + writeln!(f)?; // Display the rest of the values. std::fmt::Display::fmt(&self, f) @@ -2037,4 +2049,38 @@ west,host-d,11,9 " ); } + + #[test] + fn column_meta_equal() { + let col1 = ColumnMeta { + typ: schema::ColumnType::Tag("region".to_owned()), + logical_data_type: schema::LogicalDataType::String, + range: ( + OwnedValue::String("east".to_owned()), + OwnedValue::String("west".to_owned()), + ), + }; + + let col2 = ColumnMeta { + typ: schema::ColumnType::Tag("region".to_owned()), + logical_data_type: schema::LogicalDataType::String, + range: ( + OwnedValue::String("north".to_owned()), + OwnedValue::String("west".to_owned()), + ), + }; + + let col3 = ColumnMeta { + typ: schema::ColumnType::Tag("host".to_owned()), + logical_data_type: schema::LogicalDataType::String, + range: ( + OwnedValue::String("east".to_owned()), + OwnedValue::String("west".to_owned()), + ), + }; + + assert_eq!(col1, col2); + assert_ne!(col1, col3); + assert_ne!(col2, col3); + } } diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index 5f39d74d6a..293fc5163d 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -2,7 +2,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Display; use std::slice::Iter; -use crate::column::{AggregateResult, OwnedValue, Scalar, Value}; +use crate::column::{AggregateResult, Scalar, Value}; use crate::row_group::{self, ColumnName, GroupKey, Predicate, RowGroup}; use crate::schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema}; @@ -36,14 +36,14 @@ impl Table { pub fn new(name: String, rg: RowGroup) -> Self { Self { name, - meta: MetaData::new(&rg), + meta: MetaData::new(rg.metadata()), row_groups: vec![rg], } } /// Add a new row group to this table. pub fn add_row_group(&mut self, rg: RowGroup) { - self.meta.update(&rg); + self.meta.update(rg.metadata()); self.row_groups.push(rg); } @@ -87,16 +87,6 @@ impl Table { self.meta.time_range } - /// The ranges on each column in the table (across all row groups). - pub fn column_ranges(&self) -> BTreeMap { - todo!() - } - - /// The logical data-type of each column in the `Table`'s schema. - pub fn column_logical_types(&self) -> &BTreeMap { - &self.meta.column_data_types - } - // Identify set of row groups that might satisfy the predicate. fn filter_row_groups(&self, predicate: &Predicate) -> Vec<&RowGroup> { let mut rgs = Vec::with_capacity(self.row_groups.len()); @@ -415,18 +405,9 @@ struct MetaData { rows: u64, // The distinct set of columns for this table (all of these columns will - // appear in all of the table's row groups) and the range of values for - // each of those columns. - // - // This can be used to skip the table entirely if a logical predicate can't - // possibly match based on the range of values a column has. - column_ranges: BTreeMap, - - // The semantic type of all columns in the table. - column_types: BTreeMap, - - // The logical data type of all columns in the table. - column_data_types: BTreeMap, + // appear in all of the table's row groups) and meta data about those + // columns including their schema and range. + columns: BTreeMap, column_names: Vec, @@ -439,15 +420,13 @@ struct MetaData { } impl MetaData { - pub fn new(rg: &RowGroup) -> Self { + pub fn new(meta: &row_group::MetaData) -> Self { Self { - size: rg.size(), - rows: u64::from(rg.rows()), - column_ranges: rg.column_ranges().clone(), - column_types: rg.column_types().clone(), - column_data_types: rg.column_logical_types().clone(), - column_names: rg.column_ranges().keys().cloned().collect(), - time_range: Some(rg.time_range()), + size: meta.size, + rows: meta.rows as u64, + columns: meta.columns.clone(), + column_names: meta.columns.keys().cloned().collect(), + time_range: Some(meta.time_range), } } @@ -460,11 +439,8 @@ impl MetaData { ) -> Vec<(ColumnType, LogicalDataType)> { names .iter() - .filter_map(|&name| match self.column_types.get(name) { - Some(column_type) => Some(( - column_type.clone(), - *self.column_data_types.get(name).unwrap(), - )), + .filter_map(|&name| match self.columns.get(name) { + Some(schema) => Some((schema.typ.clone(), schema.logical_data_type)), None => None, }) .collect::>() @@ -472,14 +448,9 @@ impl MetaData { // As `schema_for_column_names` but for all columns in the table. fn schema_for_all_columns(&self) -> Vec<(ColumnType, LogicalDataType)> { - self.column_types + self.columns .iter() - .map(|(name, column_type)| { - ( - column_type.clone(), - *self.column_data_types.get(name).unwrap(), - ) - }) + .map(|(name, schema)| (schema.typ.clone(), schema.logical_data_type)) .collect::>() } @@ -490,12 +461,8 @@ impl MetaData { ) -> Vec<(ColumnType, AggregateType, LogicalDataType)> { names .iter() - .filter_map(|(name, agg_type)| match self.column_types.get(*name) { - Some(column_type) => Some(( - column_type.clone(), - *agg_type, - *self.column_data_types.get(*name).unwrap(), - )), + .filter_map(|(name, agg_type)| match self.columns.get(*name) { + Some(schema) => Some((schema.typ.clone(), *agg_type, schema.logical_data_type)), None => None, }) .collect::>() @@ -505,22 +472,23 @@ impl MetaData { self.column_names.iter().map(|name| name.as_str()).collect() } - pub fn update(&mut self, rg: &RowGroup) { + pub fn update(&mut self, meta: &row_group::MetaData) { // update size, rows, column ranges, time range - self.size += rg.size(); - self.rows += u64::from(rg.rows()); + self.size += meta.size; + self.rows += meta.rows as u64; // The incoming row group must have exactly the same schema as the // existing row groups in the table. - assert_eq!(&self.column_types, rg.column_types()); - assert_eq!(&self.column_data_types, rg.column_logical_types()); + assert_eq!(&self.columns, &meta.columns); - assert_eq!(self.column_ranges.len(), rg.column_ranges().len()); - for (column_name, (column_range_min, column_range_max)) in rg.column_ranges() { - let mut curr_range = self - .column_ranges + // Update the table schema using the incoming row group schema + for (column_name, column_meta) in &meta.columns { + let (column_range_min, column_range_max) = &column_meta.range; + let mut curr_range = &mut self + .columns .get_mut(&column_name.to_string()) - .unwrap(); + .unwrap() + .range; if column_range_min < &curr_range.0 { curr_range.0 = column_range_min.clone(); } @@ -528,6 +496,16 @@ impl MetaData { if column_range_max > &curr_range.1 { curr_range.1 = column_range_max.clone(); } + + match self.time_range { + Some(time_range) => { + self.time_range = Some(( + time_range.0.min(meta.time_range.0), + time_range.1.max(meta.time_range.1), + )); + } + None => panic!("cannot call `update` on empty Metadata"), + } } } @@ -611,6 +589,7 @@ impl<'a> Display for DisplayReadFilterResults<'a> { // write out the schema of the first result as the table header std::fmt::Display::fmt(&self.0[0].schema(), f)?; + writeln!(f)?; // write out each row group result for row_group in self.0.iter() { @@ -729,12 +708,78 @@ impl std::fmt::Display for DisplayReadAggregateResults<'_> { #[cfg(test)] mod test { + use row_group::ColumnMeta; + use super::*; - use crate::column::Column; + use crate::column::{self, Column}; use crate::row_group::{BinaryExpr, ColumnType, ReadAggregateResult}; use crate::schema; use crate::schema::LogicalDataType; + #[test] + fn meta_data_update() { + let rg_meta = row_group::MetaData { + size: 100, + rows: 2000, + columns: vec![( + "region".to_owned(), + ColumnMeta { + typ: schema::ColumnType::Tag("region".to_owned()), + logical_data_type: schema::LogicalDataType::String, + range: ( + column::OwnedValue::String("north".to_owned()), + column::OwnedValue::String("south".to_owned()), + ), + }, + )] + .into_iter() + .collect::>(), + time_range: (10, 3000), + }; + + let mut meta = MetaData::new(&rg_meta); + assert_eq!(meta.rows, 2000); + assert_eq!(meta.size, 100); + assert_eq!(meta.time_range, Some((10, 3000))); + assert_eq!( + meta.columns.get("region").unwrap().range, + ( + column::OwnedValue::String("north".to_owned()), + column::OwnedValue::String("south".to_owned()) + ) + ); + + meta.update(&row_group::MetaData { + size: 300, + rows: 1500, + columns: vec![( + "region".to_owned(), + ColumnMeta { + typ: schema::ColumnType::Tag("region".to_owned()), + logical_data_type: schema::LogicalDataType::String, + range: ( + column::OwnedValue::String("east".to_owned()), + column::OwnedValue::String("north".to_owned()), + ), + }, + )] + .into_iter() + .collect::>(), + time_range: (10, 3500), + }); + + assert_eq!(meta.rows, 3500); + assert_eq!(meta.size, 400); + assert_eq!(meta.time_range, Some((10, 3500))); + assert_eq!( + meta.columns.get("region").unwrap().range, + ( + column::OwnedValue::String("east".to_owned()), + column::OwnedValue::String("south".to_owned()) + ) + ); + } + #[test] fn select() { // Build first segment. @@ -754,13 +799,21 @@ mod test { let mut table = Table::new("cpu".to_owned(), rg); let exp_col_types = vec![ - ("region".to_owned(), LogicalDataType::String), - ("count".to_owned(), LogicalDataType::Unsigned), - ("time".to_owned(), LogicalDataType::Integer), + ("region", LogicalDataType::String), + ("count", LogicalDataType::Unsigned), + ("time", LogicalDataType::Integer), ] .into_iter() .collect::>(); - assert_eq!(table.column_logical_types(), &exp_col_types); + assert_eq!( + table + .meta + .columns + .iter() + .map(|(k, v)| (k.as_str(), v.logical_data_type)) + .collect::>(), + exp_col_types + ); // Build another segment. let mut columns = BTreeMap::new(); From 5f6335573bc2023f5d7025812546288b43c384fc Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Mon, 18 Jan 2021 18:04:55 +0000 Subject: [PATCH 08/11] fix: ensure table missing handled --- read_buffer/src/chunk.rs | 15 ++++++++++++--- read_buffer/src/lib.rs | 22 +++++++++++++++++----- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index 01f614808b..c4b489cedf 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -59,6 +59,11 @@ impl Chunk { self.tables.len() } + /// Returns true if the chunk contains data for this table. + pub fn has_table(&self, table_name: &str) -> bool { + self.tables.contains_key(table_name) + } + /// Returns true if there are no tables under this chunk. pub fn is_empty(&self) -> bool { self.tables() == 0 @@ -97,10 +102,14 @@ impl Chunk { table_name: &str, predicate: &Predicate, select_columns: &ColumnSelection<'_>, - ) -> Option> { + ) -> Result, Error> { + // Lookup table by name and dispatch execution. match self.tables.get(table_name) { - Some(table) => Some(table.read_filter(select_columns, predicate)), - None => None, + Some(table) => Ok(table.read_filter(select_columns, predicate)), + None => crate::TableNotFound { + table_name: table_name.to_owned(), + } + .fail(), } } diff --git a/read_buffer/src/lib.rs b/read_buffer/src/lib.rs index 490ad54351..55cb22fc59 100644 --- a/read_buffer/src/lib.rs +++ b/read_buffer/src/lib.rs @@ -20,7 +20,7 @@ use arrow_deps::arrow::{ datatypes::{DataType::Utf8, Field, Schema}, record_batch::RecordBatch, }; -use snafu::{ResultExt, Snafu}; +use snafu::{ensure, ResultExt, Snafu}; // Identifiers that are exported as part of the public API. pub use row_group::{BinaryExpr, Predicate}; @@ -205,6 +205,18 @@ impl Database { Some(partition) => { let mut chunks = vec![]; for chunk_id in chunk_ids { + let chunk = partition + .chunks + .get(chunk_id) + .ok_or_else(|| Error::ChunkNotFound { id: *chunk_id })?; + + ensure!( + chunk.has_table(table_name), + TableNotFound { + table_name: table_name.to_owned(), + } + ); + chunks.push( partition .chunks @@ -518,10 +530,10 @@ impl<'input, 'chunk> Iterator for ReadFilterResults<'input, 'chunk> { // Try next chunk's table. if self.curr_table_results.is_none() { - self.curr_table_results = self.chunks[self.next_i].read_filter( - self.table_name, - &self.predicate, - &self.select_columns, + self.curr_table_results = Some( + self.chunks[self.next_i] + .read_filter(self.table_name, &self.predicate, &self.select_columns) + .unwrap(), ); } From dbdd885e58eeef2e5b27644f22effa22227009b7 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Mon, 18 Jan 2021 18:09:25 +0000 Subject: [PATCH 09/11] refactor: follow snafu style guide --- read_buffer/src/lib.rs | 5 ----- read_buffer/src/row_group.rs | 22 +++++++++++++++++++--- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/read_buffer/src/lib.rs b/read_buffer/src/lib.rs index 55cb22fc59..2a7374c7dd 100644 --- a/read_buffer/src/lib.rs +++ b/read_buffer/src/lib.rs @@ -53,11 +53,6 @@ pub enum Error { #[snafu(display("unsupported operation: {}", msg))] UnsupportedOperation { msg: String }, - - #[snafu(display("schema conversion error: {}", source))] - SchemaError { - source: data_types::schema::builder::Error, - }, } pub type Result = std::result::Result; diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index e6e31339b2..de0e21597d 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -6,6 +6,7 @@ use std::{ use hashbrown::{hash_map, HashMap}; use itertools::Itertools; +use snafu::Snafu; use crate::column::{ cmp::Operator, AggregateResult, Column, EncodedValues, OwnedValue, RowIDs, RowIDsOption, @@ -23,6 +24,21 @@ use data_types::schema::{InfluxColumnType, Schema}; /// The name used for a timestamp column. pub const TIME_COLUMN_NAME: &str = data_types::TIME_COLUMN_NAME; +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("arrow conversion error: {}", source))] + ArrowError { + source: arrow_deps::arrow::error::ArrowError, + }, + + #[snafu(display("schema conversion error: {}", source))] + SchemaError { + source: data_types::schema::builder::Error, + }, +} + +pub type Result = std::result::Result; + /// A `RowGroup` is an immutable horizontal chunk of a single `Table`. By /// definition it has the same schema as all the other read groups in the table. /// All the columns within the `RowGroup` must have the same number of logical @@ -1333,11 +1349,11 @@ impl ReadFilterResult<'_> { } impl TryFrom> for RecordBatch { - type Error = crate::Error; + type Error = Error; fn try_from(result: ReadFilterResult<'_>) -> Result { let schema = data_types::schema::Schema::try_from(result.schema()) - .map_err(|source| crate::Error::SchemaError { source })?; + .map_err(|source| Error::SchemaError { source })?; let arrow_schema: arrow_deps::arrow::datatypes::SchemaRef = schema.into(); let columns = result @@ -1350,7 +1366,7 @@ impl TryFrom> for RecordBatch { // of rows on columns differ. We have full control over both so there // should never be an error to return... arrow::record_batch::RecordBatch::try_new(arrow_schema, columns) - .map_err(|source| crate::Error::ArrowError { source }) + .map_err(|source| Error::ArrowError { source }) } } From 57365b082c6bf220234b23072d8feb9c446c18ce Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 19 Jan 2021 15:10:56 +0000 Subject: [PATCH 10/11] refactor: clean up filter_map --- read_buffer/src/table.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index 293fc5163d..89c9a53ad5 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -461,9 +461,10 @@ impl MetaData { ) -> Vec<(ColumnType, AggregateType, LogicalDataType)> { names .iter() - .filter_map(|(name, agg_type)| match self.columns.get(*name) { - Some(schema) => Some((schema.typ.clone(), *agg_type, schema.logical_data_type)), - None => None, + .filter_map(|(name, agg_type)| { + self.columns + .get(*name) + .map(|schema| (schema.typ.clone(), *agg_type, schema.logical_data_type)) }) .collect::>() } From 221ed86853e58aca8be5993ae0599be679279ffb Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 19 Jan 2021 15:12:20 +0000 Subject: [PATCH 11/11] refactor: apply suggestions from code review Co-authored-by: Andrew Lamb --- read_buffer/src/lib.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/read_buffer/src/lib.rs b/read_buffer/src/lib.rs index 2a7374c7dd..5ce2e9ee3a 100644 --- a/read_buffer/src/lib.rs +++ b/read_buffer/src/lib.rs @@ -20,7 +20,7 @@ use arrow_deps::arrow::{ datatypes::{DataType::Utf8, Field, Schema}, record_batch::RecordBatch, }; -use snafu::{ensure, ResultExt, Snafu}; +use snafu::{ensure, OptionExt, ResultExt, Snafu}; // Identifiers that are exported as part of the public API. pub use row_group::{BinaryExpr, Predicate}; @@ -203,14 +203,9 @@ impl Database { let chunk = partition .chunks .get(chunk_id) - .ok_or_else(|| Error::ChunkNotFound { id: *chunk_id })?; + .context(ChunkNotFound { id: *chunk_id })?; - ensure!( - chunk.has_table(table_name), - TableNotFound { - table_name: table_name.to_owned(), - } - ); + ensure!(chunk.has_table(table_name), TableNotFound { table_name }); chunks.push( partition