diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index 84637dcf8f..e6e31339b2 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -52,24 +52,25 @@ impl RowGroup { ColumnType::Tag(c) => { assert_eq!(c.num_rows(), rows); - meta.column_ranges - .insert(name.clone(), c.column_range().unwrap()); - meta.column_data_types - .insert(name.clone(), c.logical_datatype()); - meta.column_types - .insert(name.clone(), schema::ColumnType::Tag(name.clone())); + meta.add_column( + &name, + schema::ColumnType::Tag(name.clone()), + c.logical_datatype(), + c.column_range().unwrap(), + ); + all_columns_by_name.insert(name.clone(), all_columns.len()); all_columns.push(c); } ColumnType::Field(c) => { assert_eq!(c.num_rows(), rows); - meta.column_ranges - .insert(name.clone(), c.column_range().unwrap()); - meta.column_data_types - .insert(name.clone(), c.logical_datatype()); - meta.column_types - .insert(name.clone(), schema::ColumnType::Field(name.clone())); + meta.add_column( + &name, + schema::ColumnType::Field(name.clone()), + c.logical_datatype(), + c.column_range().unwrap(), + ); all_columns_by_name.insert(name.clone(), all_columns.len()); all_columns.push(c); } @@ -85,12 +86,12 @@ impl RowGroup { Some((_, _)) => unreachable!("unexpected types for time range"), }; - meta.column_ranges - .insert(name.clone(), c.column_range().unwrap()); - meta.column_data_types - .insert(name.clone(), c.logical_datatype()); - meta.column_types - .insert(name.clone(), schema::ColumnType::Timestamp(name.clone())); + meta.add_column( + &name, + schema::ColumnType::Timestamp(name.clone()), + c.logical_datatype(), + c.column_range().unwrap(), + ); all_columns_by_name.insert(name.clone(), all_columns.len()); time_column = Some(all_columns.len()); @@ -100,10 +101,7 @@ impl RowGroup { } // Meta data should have same columns for types and ranges. - assert_eq!( - meta.column_data_types.keys().collect::>(), - meta.column_ranges.keys().collect::>(), - ); + assert_eq!(meta.columns.keys().len(), all_columns.len()); Self { meta, @@ -124,19 +122,9 @@ impl RowGroup { self.meta.rows } - /// The ranges on each column in the `RowGroup`. - pub fn column_ranges(&self) -> &BTreeMap { - &self.meta.column_ranges - } - - /// The semantic types of each column. - pub fn column_types(&self) -> &BTreeMap { - &self.meta.column_types - } - - /// The logical data-types of each column. - pub fn column_logical_types(&self) -> &BTreeMap { - &self.meta.column_data_types + // The row group's meta data. + pub fn metadata(&self) -> &MetaData { + &self.meta } // Returns a reference to a column from the column name. @@ -1203,13 +1191,28 @@ impl ColumnType { } } +#[derive(Debug, Clone)] +pub struct ColumnMeta { + pub typ: crate::schema::ColumnType, + pub logical_data_type: LogicalDataType, + pub range: (OwnedValue, OwnedValue), +} + +// column metadata is equivalent for two columns if their logical type and +// semantic type are equivalent. +impl PartialEq for ColumnMeta { + fn eq(&self, other: &Self) -> bool { + self.typ == other.typ && self.logical_data_type == other.logical_data_type + } +} + #[derive(Default, Debug)] -struct MetaData { +pub struct MetaData { // The total size of the table in bytes. - size: u64, + pub size: u64, // The total number of rows in the table. - rows: u32, + pub rows: u32, // The distinct set of columns for this `RowGroup` (all of these columns // will appear in all of the `Table`'s `RowGroup`s) and the range of values @@ -1217,20 +1220,14 @@ struct MetaData { // // This can be used to skip the table entirely if a logical predicate can't // possibly match based on the range of values a column has. - column_ranges: BTreeMap, - - // The semantic type of the columns in the row group - column_types: BTreeMap, - - // The logical data type of the columns in the row group. - column_data_types: BTreeMap, + pub columns: BTreeMap, // The total time range of this table spanning all of the `RowGroup`s within // the table. // // This can be used to skip the table entirely if the time range for a query // falls outside of this range. - time_range: (i64, i64), + pub time_range: (i64, i64), } impl MetaData { @@ -1239,8 +1236,8 @@ impl MetaData { // no rows in the `RowGroup` would ever match the expression. // pub fn column_could_satisfy_binary_expr(&self, expr: &BinaryExpr) -> bool { - let (column_min, column_max) = match self.column_ranges.get(expr.column()) { - Some(range) => range, + let (column_min, column_max) = match self.columns.get(expr.column()) { + Some(schema) => &schema.range, None => return false, // column doesn't exist. }; @@ -1272,6 +1269,23 @@ impl MetaData { } } + pub fn add_column( + &mut self, + name: &str, + col_type: schema::ColumnType, + logical_data_type: LogicalDataType, + range: (OwnedValue, OwnedValue), + ) { + self.columns.insert( + name.to_owned(), + ColumnMeta { + typ: col_type, + logical_data_type, + range, + }, + ); + } + // Extract schema information for a set of columns. fn schema_for_column_names( &self, @@ -1280,9 +1294,8 @@ impl MetaData { names .iter() .map(|&name| { - let col_type = self.column_types.get(name).unwrap(); - let data_type = self.column_data_types.get(name).unwrap(); - (col_type.clone(), *data_type) + let schema = self.columns.get(name).unwrap(); + (schema.typ.clone(), schema.logical_data_type) }) .collect::>() } @@ -1295,10 +1308,8 @@ impl MetaData { columns .iter() .map(|(name, agg_type)| { - let col_type = self.column_types.get(*name).unwrap(); - let data_type = self.column_data_types.get(*name).unwrap(); - - (col_type.clone(), *agg_type, *data_type) + let schema = self.columns.get(*name).unwrap(); + (schema.typ.clone(), *agg_type, schema.logical_data_type) }) .collect::>() } @@ -1347,6 +1358,7 @@ impl std::fmt::Debug for &ReadFilterResult<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { // Display the header std::fmt::Display::fmt(self.schema(), f)?; + writeln!(f)?; // Display the rest of the values. std::fmt::Display::fmt(&self, f) @@ -2037,4 +2049,38 @@ west,host-d,11,9 " ); } + + #[test] + fn column_meta_equal() { + let col1 = ColumnMeta { + typ: schema::ColumnType::Tag("region".to_owned()), + logical_data_type: schema::LogicalDataType::String, + range: ( + OwnedValue::String("east".to_owned()), + OwnedValue::String("west".to_owned()), + ), + }; + + let col2 = ColumnMeta { + typ: schema::ColumnType::Tag("region".to_owned()), + logical_data_type: schema::LogicalDataType::String, + range: ( + OwnedValue::String("north".to_owned()), + OwnedValue::String("west".to_owned()), + ), + }; + + let col3 = ColumnMeta { + typ: schema::ColumnType::Tag("host".to_owned()), + logical_data_type: schema::LogicalDataType::String, + range: ( + OwnedValue::String("east".to_owned()), + OwnedValue::String("west".to_owned()), + ), + }; + + assert_eq!(col1, col2); + assert_ne!(col1, col3); + assert_ne!(col2, col3); + } } diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index 5f39d74d6a..293fc5163d 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -2,7 +2,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Display; use std::slice::Iter; -use crate::column::{AggregateResult, OwnedValue, Scalar, Value}; +use crate::column::{AggregateResult, Scalar, Value}; use crate::row_group::{self, ColumnName, GroupKey, Predicate, RowGroup}; use crate::schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema}; @@ -36,14 +36,14 @@ impl Table { pub fn new(name: String, rg: RowGroup) -> Self { Self { name, - meta: MetaData::new(&rg), + meta: MetaData::new(rg.metadata()), row_groups: vec![rg], } } /// Add a new row group to this table. pub fn add_row_group(&mut self, rg: RowGroup) { - self.meta.update(&rg); + self.meta.update(rg.metadata()); self.row_groups.push(rg); } @@ -87,16 +87,6 @@ impl Table { self.meta.time_range } - /// The ranges on each column in the table (across all row groups). - pub fn column_ranges(&self) -> BTreeMap { - todo!() - } - - /// The logical data-type of each column in the `Table`'s schema. - pub fn column_logical_types(&self) -> &BTreeMap { - &self.meta.column_data_types - } - // Identify set of row groups that might satisfy the predicate. fn filter_row_groups(&self, predicate: &Predicate) -> Vec<&RowGroup> { let mut rgs = Vec::with_capacity(self.row_groups.len()); @@ -415,18 +405,9 @@ struct MetaData { rows: u64, // The distinct set of columns for this table (all of these columns will - // appear in all of the table's row groups) and the range of values for - // each of those columns. - // - // This can be used to skip the table entirely if a logical predicate can't - // possibly match based on the range of values a column has. - column_ranges: BTreeMap, - - // The semantic type of all columns in the table. - column_types: BTreeMap, - - // The logical data type of all columns in the table. - column_data_types: BTreeMap, + // appear in all of the table's row groups) and meta data about those + // columns including their schema and range. + columns: BTreeMap, column_names: Vec, @@ -439,15 +420,13 @@ struct MetaData { } impl MetaData { - pub fn new(rg: &RowGroup) -> Self { + pub fn new(meta: &row_group::MetaData) -> Self { Self { - size: rg.size(), - rows: u64::from(rg.rows()), - column_ranges: rg.column_ranges().clone(), - column_types: rg.column_types().clone(), - column_data_types: rg.column_logical_types().clone(), - column_names: rg.column_ranges().keys().cloned().collect(), - time_range: Some(rg.time_range()), + size: meta.size, + rows: meta.rows as u64, + columns: meta.columns.clone(), + column_names: meta.columns.keys().cloned().collect(), + time_range: Some(meta.time_range), } } @@ -460,11 +439,8 @@ impl MetaData { ) -> Vec<(ColumnType, LogicalDataType)> { names .iter() - .filter_map(|&name| match self.column_types.get(name) { - Some(column_type) => Some(( - column_type.clone(), - *self.column_data_types.get(name).unwrap(), - )), + .filter_map(|&name| match self.columns.get(name) { + Some(schema) => Some((schema.typ.clone(), schema.logical_data_type)), None => None, }) .collect::>() @@ -472,14 +448,9 @@ impl MetaData { // As `schema_for_column_names` but for all columns in the table. fn schema_for_all_columns(&self) -> Vec<(ColumnType, LogicalDataType)> { - self.column_types + self.columns .iter() - .map(|(name, column_type)| { - ( - column_type.clone(), - *self.column_data_types.get(name).unwrap(), - ) - }) + .map(|(name, schema)| (schema.typ.clone(), schema.logical_data_type)) .collect::>() } @@ -490,12 +461,8 @@ impl MetaData { ) -> Vec<(ColumnType, AggregateType, LogicalDataType)> { names .iter() - .filter_map(|(name, agg_type)| match self.column_types.get(*name) { - Some(column_type) => Some(( - column_type.clone(), - *agg_type, - *self.column_data_types.get(*name).unwrap(), - )), + .filter_map(|(name, agg_type)| match self.columns.get(*name) { + Some(schema) => Some((schema.typ.clone(), *agg_type, schema.logical_data_type)), None => None, }) .collect::>() @@ -505,22 +472,23 @@ impl MetaData { self.column_names.iter().map(|name| name.as_str()).collect() } - pub fn update(&mut self, rg: &RowGroup) { + pub fn update(&mut self, meta: &row_group::MetaData) { // update size, rows, column ranges, time range - self.size += rg.size(); - self.rows += u64::from(rg.rows()); + self.size += meta.size; + self.rows += meta.rows as u64; // The incoming row group must have exactly the same schema as the // existing row groups in the table. - assert_eq!(&self.column_types, rg.column_types()); - assert_eq!(&self.column_data_types, rg.column_logical_types()); + assert_eq!(&self.columns, &meta.columns); - assert_eq!(self.column_ranges.len(), rg.column_ranges().len()); - for (column_name, (column_range_min, column_range_max)) in rg.column_ranges() { - let mut curr_range = self - .column_ranges + // Update the table schema using the incoming row group schema + for (column_name, column_meta) in &meta.columns { + let (column_range_min, column_range_max) = &column_meta.range; + let mut curr_range = &mut self + .columns .get_mut(&column_name.to_string()) - .unwrap(); + .unwrap() + .range; if column_range_min < &curr_range.0 { curr_range.0 = column_range_min.clone(); } @@ -528,6 +496,16 @@ impl MetaData { if column_range_max > &curr_range.1 { curr_range.1 = column_range_max.clone(); } + + match self.time_range { + Some(time_range) => { + self.time_range = Some(( + time_range.0.min(meta.time_range.0), + time_range.1.max(meta.time_range.1), + )); + } + None => panic!("cannot call `update` on empty Metadata"), + } } } @@ -611,6 +589,7 @@ impl<'a> Display for DisplayReadFilterResults<'a> { // write out the schema of the first result as the table header std::fmt::Display::fmt(&self.0[0].schema(), f)?; + writeln!(f)?; // write out each row group result for row_group in self.0.iter() { @@ -729,12 +708,78 @@ impl std::fmt::Display for DisplayReadAggregateResults<'_> { #[cfg(test)] mod test { + use row_group::ColumnMeta; + use super::*; - use crate::column::Column; + use crate::column::{self, Column}; use crate::row_group::{BinaryExpr, ColumnType, ReadAggregateResult}; use crate::schema; use crate::schema::LogicalDataType; + #[test] + fn meta_data_update() { + let rg_meta = row_group::MetaData { + size: 100, + rows: 2000, + columns: vec![( + "region".to_owned(), + ColumnMeta { + typ: schema::ColumnType::Tag("region".to_owned()), + logical_data_type: schema::LogicalDataType::String, + range: ( + column::OwnedValue::String("north".to_owned()), + column::OwnedValue::String("south".to_owned()), + ), + }, + )] + .into_iter() + .collect::>(), + time_range: (10, 3000), + }; + + let mut meta = MetaData::new(&rg_meta); + assert_eq!(meta.rows, 2000); + assert_eq!(meta.size, 100); + assert_eq!(meta.time_range, Some((10, 3000))); + assert_eq!( + meta.columns.get("region").unwrap().range, + ( + column::OwnedValue::String("north".to_owned()), + column::OwnedValue::String("south".to_owned()) + ) + ); + + meta.update(&row_group::MetaData { + size: 300, + rows: 1500, + columns: vec![( + "region".to_owned(), + ColumnMeta { + typ: schema::ColumnType::Tag("region".to_owned()), + logical_data_type: schema::LogicalDataType::String, + range: ( + column::OwnedValue::String("east".to_owned()), + column::OwnedValue::String("north".to_owned()), + ), + }, + )] + .into_iter() + .collect::>(), + time_range: (10, 3500), + }); + + assert_eq!(meta.rows, 3500); + assert_eq!(meta.size, 400); + assert_eq!(meta.time_range, Some((10, 3500))); + assert_eq!( + meta.columns.get("region").unwrap().range, + ( + column::OwnedValue::String("east".to_owned()), + column::OwnedValue::String("south".to_owned()) + ) + ); + } + #[test] fn select() { // Build first segment. @@ -754,13 +799,21 @@ mod test { let mut table = Table::new("cpu".to_owned(), rg); let exp_col_types = vec![ - ("region".to_owned(), LogicalDataType::String), - ("count".to_owned(), LogicalDataType::Unsigned), - ("time".to_owned(), LogicalDataType::Integer), + ("region", LogicalDataType::String), + ("count", LogicalDataType::Unsigned), + ("time", LogicalDataType::Integer), ] .into_iter() .collect::>(); - assert_eq!(table.column_logical_types(), &exp_col_types); + assert_eq!( + table + .meta + .columns + .iter() + .map(|(k, v)| (k.as_str(), v.logical_data_type)) + .collect::>(), + exp_col_types + ); // Build another segment. let mut columns = BTreeMap::new();