refactor: encapsulate column meta data

pull/24376/head
Edd Robinson 2021-01-18 17:53:06 +00:00
parent 0c7424465d
commit 9f65c4b6ef
2 changed files with 220 additions and 121 deletions

View File

@ -52,24 +52,25 @@ impl RowGroup {
ColumnType::Tag(c) => {
assert_eq!(c.num_rows(), rows);
meta.column_ranges
.insert(name.clone(), c.column_range().unwrap());
meta.column_data_types
.insert(name.clone(), c.logical_datatype());
meta.column_types
.insert(name.clone(), schema::ColumnType::Tag(name.clone()));
meta.add_column(
&name,
schema::ColumnType::Tag(name.clone()),
c.logical_datatype(),
c.column_range().unwrap(),
);
all_columns_by_name.insert(name.clone(), all_columns.len());
all_columns.push(c);
}
ColumnType::Field(c) => {
assert_eq!(c.num_rows(), rows);
meta.column_ranges
.insert(name.clone(), c.column_range().unwrap());
meta.column_data_types
.insert(name.clone(), c.logical_datatype());
meta.column_types
.insert(name.clone(), schema::ColumnType::Field(name.clone()));
meta.add_column(
&name,
schema::ColumnType::Field(name.clone()),
c.logical_datatype(),
c.column_range().unwrap(),
);
all_columns_by_name.insert(name.clone(), all_columns.len());
all_columns.push(c);
}
@ -85,12 +86,12 @@ impl RowGroup {
Some((_, _)) => unreachable!("unexpected types for time range"),
};
meta.column_ranges
.insert(name.clone(), c.column_range().unwrap());
meta.column_data_types
.insert(name.clone(), c.logical_datatype());
meta.column_types
.insert(name.clone(), schema::ColumnType::Timestamp(name.clone()));
meta.add_column(
&name,
schema::ColumnType::Timestamp(name.clone()),
c.logical_datatype(),
c.column_range().unwrap(),
);
all_columns_by_name.insert(name.clone(), all_columns.len());
time_column = Some(all_columns.len());
@ -100,10 +101,7 @@ impl RowGroup {
}
// Meta data should have same columns for types and ranges.
assert_eq!(
meta.column_data_types.keys().collect::<Vec<_>>(),
meta.column_ranges.keys().collect::<Vec<_>>(),
);
assert_eq!(meta.columns.keys().len(), all_columns.len());
Self {
meta,
@ -124,19 +122,9 @@ impl RowGroup {
self.meta.rows
}
/// The ranges on each column in the `RowGroup`.
pub fn column_ranges(&self) -> &BTreeMap<String, (OwnedValue, OwnedValue)> {
&self.meta.column_ranges
}
/// The semantic types of each column.
pub fn column_types(&self) -> &BTreeMap<String, crate::schema::ColumnType> {
&self.meta.column_types
}
/// The logical data-types of each column.
pub fn column_logical_types(&self) -> &BTreeMap<String, LogicalDataType> {
&self.meta.column_data_types
// The row group's meta data.
pub fn metadata(&self) -> &MetaData {
&self.meta
}
// Returns a reference to a column from the column name.
@ -1203,13 +1191,28 @@ impl ColumnType {
}
}
#[derive(Debug, Clone)]
pub struct ColumnMeta {
pub typ: crate::schema::ColumnType,
pub logical_data_type: LogicalDataType,
pub range: (OwnedValue, OwnedValue),
}
// column metadata is equivalent for two columns if their logical type and
// semantic type are equivalent.
impl PartialEq for ColumnMeta {
fn eq(&self, other: &Self) -> bool {
self.typ == other.typ && self.logical_data_type == other.logical_data_type
}
}
#[derive(Default, Debug)]
struct MetaData {
pub struct MetaData {
// The total size of the table in bytes.
size: u64,
pub size: u64,
// The total number of rows in the table.
rows: u32,
pub rows: u32,
// The distinct set of columns for this `RowGroup` (all of these columns
// will appear in all of the `Table`'s `RowGroup`s) and the range of values
@ -1217,20 +1220,14 @@ struct MetaData {
//
// This can be used to skip the table entirely if a logical predicate can't
// possibly match based on the range of values a column has.
column_ranges: BTreeMap<String, (OwnedValue, OwnedValue)>,
// The semantic type of the columns in the row group
column_types: BTreeMap<String, crate::schema::ColumnType>,
// The logical data type of the columns in the row group.
column_data_types: BTreeMap<String, LogicalDataType>,
pub columns: BTreeMap<String, ColumnMeta>,
// The total time range of this table spanning all of the `RowGroup`s within
// the table.
//
// This can be used to skip the table entirely if the time range for a query
// falls outside of this range.
time_range: (i64, i64),
pub time_range: (i64, i64),
}
impl MetaData {
@ -1239,8 +1236,8 @@ impl MetaData {
// no rows in the `RowGroup` would ever match the expression.
//
pub fn column_could_satisfy_binary_expr(&self, expr: &BinaryExpr) -> bool {
let (column_min, column_max) = match self.column_ranges.get(expr.column()) {
Some(range) => range,
let (column_min, column_max) = match self.columns.get(expr.column()) {
Some(schema) => &schema.range,
None => return false, // column doesn't exist.
};
@ -1272,6 +1269,23 @@ impl MetaData {
}
}
pub fn add_column(
&mut self,
name: &str,
col_type: schema::ColumnType,
logical_data_type: LogicalDataType,
range: (OwnedValue, OwnedValue),
) {
self.columns.insert(
name.to_owned(),
ColumnMeta {
typ: col_type,
logical_data_type,
range,
},
);
}
// Extract schema information for a set of columns.
fn schema_for_column_names(
&self,
@ -1280,9 +1294,8 @@ impl MetaData {
names
.iter()
.map(|&name| {
let col_type = self.column_types.get(name).unwrap();
let data_type = self.column_data_types.get(name).unwrap();
(col_type.clone(), *data_type)
let schema = self.columns.get(name).unwrap();
(schema.typ.clone(), schema.logical_data_type)
})
.collect::<Vec<_>>()
}
@ -1295,10 +1308,8 @@ impl MetaData {
columns
.iter()
.map(|(name, agg_type)| {
let col_type = self.column_types.get(*name).unwrap();
let data_type = self.column_data_types.get(*name).unwrap();
(col_type.clone(), *agg_type, *data_type)
let schema = self.columns.get(*name).unwrap();
(schema.typ.clone(), *agg_type, schema.logical_data_type)
})
.collect::<Vec<_>>()
}
@ -1347,6 +1358,7 @@ impl std::fmt::Debug for &ReadFilterResult<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Display the header
std::fmt::Display::fmt(self.schema(), f)?;
writeln!(f)?;
// Display the rest of the values.
std::fmt::Display::fmt(&self, f)
@ -2037,4 +2049,38 @@ west,host-d,11,9
"
);
}
#[test]
fn column_meta_equal() {
let col1 = ColumnMeta {
typ: schema::ColumnType::Tag("region".to_owned()),
logical_data_type: schema::LogicalDataType::String,
range: (
OwnedValue::String("east".to_owned()),
OwnedValue::String("west".to_owned()),
),
};
let col2 = ColumnMeta {
typ: schema::ColumnType::Tag("region".to_owned()),
logical_data_type: schema::LogicalDataType::String,
range: (
OwnedValue::String("north".to_owned()),
OwnedValue::String("west".to_owned()),
),
};
let col3 = ColumnMeta {
typ: schema::ColumnType::Tag("host".to_owned()),
logical_data_type: schema::LogicalDataType::String,
range: (
OwnedValue::String("east".to_owned()),
OwnedValue::String("west".to_owned()),
),
};
assert_eq!(col1, col2);
assert_ne!(col1, col3);
assert_ne!(col2, col3);
}
}

View File

@ -2,7 +2,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Display;
use std::slice::Iter;
use crate::column::{AggregateResult, OwnedValue, Scalar, Value};
use crate::column::{AggregateResult, Scalar, Value};
use crate::row_group::{self, ColumnName, GroupKey, Predicate, RowGroup};
use crate::schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema};
@ -36,14 +36,14 @@ impl Table {
pub fn new(name: String, rg: RowGroup) -> Self {
Self {
name,
meta: MetaData::new(&rg),
meta: MetaData::new(rg.metadata()),
row_groups: vec![rg],
}
}
/// Add a new row group to this table.
pub fn add_row_group(&mut self, rg: RowGroup) {
self.meta.update(&rg);
self.meta.update(rg.metadata());
self.row_groups.push(rg);
}
@ -87,16 +87,6 @@ impl Table {
self.meta.time_range
}
/// The ranges on each column in the table (across all row groups).
pub fn column_ranges(&self) -> BTreeMap<String, (OwnedValue, OwnedValue)> {
todo!()
}
/// The logical data-type of each column in the `Table`'s schema.
pub fn column_logical_types(&self) -> &BTreeMap<String, LogicalDataType> {
&self.meta.column_data_types
}
// Identify set of row groups that might satisfy the predicate.
fn filter_row_groups(&self, predicate: &Predicate) -> Vec<&RowGroup> {
let mut rgs = Vec::with_capacity(self.row_groups.len());
@ -415,18 +405,9 @@ struct MetaData {
rows: u64,
// The distinct set of columns for this table (all of these columns will
// appear in all of the table's row groups) and the range of values for
// each of those columns.
//
// This can be used to skip the table entirely if a logical predicate can't
// possibly match based on the range of values a column has.
column_ranges: BTreeMap<String, (OwnedValue, OwnedValue)>,
// The semantic type of all columns in the table.
column_types: BTreeMap<String, crate::schema::ColumnType>,
// The logical data type of all columns in the table.
column_data_types: BTreeMap<String, LogicalDataType>,
// appear in all of the table's row groups) and meta data about those
// columns including their schema and range.
columns: BTreeMap<String, row_group::ColumnMeta>,
column_names: Vec<String>,
@ -439,15 +420,13 @@ struct MetaData {
}
impl MetaData {
pub fn new(rg: &RowGroup) -> Self {
pub fn new(meta: &row_group::MetaData) -> Self {
Self {
size: rg.size(),
rows: u64::from(rg.rows()),
column_ranges: rg.column_ranges().clone(),
column_types: rg.column_types().clone(),
column_data_types: rg.column_logical_types().clone(),
column_names: rg.column_ranges().keys().cloned().collect(),
time_range: Some(rg.time_range()),
size: meta.size,
rows: meta.rows as u64,
columns: meta.columns.clone(),
column_names: meta.columns.keys().cloned().collect(),
time_range: Some(meta.time_range),
}
}
@ -460,11 +439,8 @@ impl MetaData {
) -> Vec<(ColumnType, LogicalDataType)> {
names
.iter()
.filter_map(|&name| match self.column_types.get(name) {
Some(column_type) => Some((
column_type.clone(),
*self.column_data_types.get(name).unwrap(),
)),
.filter_map(|&name| match self.columns.get(name) {
Some(schema) => Some((schema.typ.clone(), schema.logical_data_type)),
None => None,
})
.collect::<Vec<_>>()
@ -472,14 +448,9 @@ impl MetaData {
// As `schema_for_column_names` but for all columns in the table.
fn schema_for_all_columns(&self) -> Vec<(ColumnType, LogicalDataType)> {
self.column_types
self.columns
.iter()
.map(|(name, column_type)| {
(
column_type.clone(),
*self.column_data_types.get(name).unwrap(),
)
})
.map(|(name, schema)| (schema.typ.clone(), schema.logical_data_type))
.collect::<Vec<_>>()
}
@ -490,12 +461,8 @@ impl MetaData {
) -> Vec<(ColumnType, AggregateType, LogicalDataType)> {
names
.iter()
.filter_map(|(name, agg_type)| match self.column_types.get(*name) {
Some(column_type) => Some((
column_type.clone(),
*agg_type,
*self.column_data_types.get(*name).unwrap(),
)),
.filter_map(|(name, agg_type)| match self.columns.get(*name) {
Some(schema) => Some((schema.typ.clone(), *agg_type, schema.logical_data_type)),
None => None,
})
.collect::<Vec<_>>()
@ -505,22 +472,23 @@ impl MetaData {
self.column_names.iter().map(|name| name.as_str()).collect()
}
pub fn update(&mut self, rg: &RowGroup) {
pub fn update(&mut self, meta: &row_group::MetaData) {
// update size, rows, column ranges, time range
self.size += rg.size();
self.rows += u64::from(rg.rows());
self.size += meta.size;
self.rows += meta.rows as u64;
// The incoming row group must have exactly the same schema as the
// existing row groups in the table.
assert_eq!(&self.column_types, rg.column_types());
assert_eq!(&self.column_data_types, rg.column_logical_types());
assert_eq!(&self.columns, &meta.columns);
assert_eq!(self.column_ranges.len(), rg.column_ranges().len());
for (column_name, (column_range_min, column_range_max)) in rg.column_ranges() {
let mut curr_range = self
.column_ranges
// Update the table schema using the incoming row group schema
for (column_name, column_meta) in &meta.columns {
let (column_range_min, column_range_max) = &column_meta.range;
let mut curr_range = &mut self
.columns
.get_mut(&column_name.to_string())
.unwrap();
.unwrap()
.range;
if column_range_min < &curr_range.0 {
curr_range.0 = column_range_min.clone();
}
@ -528,6 +496,16 @@ impl MetaData {
if column_range_max > &curr_range.1 {
curr_range.1 = column_range_max.clone();
}
match self.time_range {
Some(time_range) => {
self.time_range = Some((
time_range.0.min(meta.time_range.0),
time_range.1.max(meta.time_range.1),
));
}
None => panic!("cannot call `update` on empty Metadata"),
}
}
}
@ -611,6 +589,7 @@ impl<'a> Display for DisplayReadFilterResults<'a> {
// write out the schema of the first result as the table header
std::fmt::Display::fmt(&self.0[0].schema(), f)?;
writeln!(f)?;
// write out each row group result
for row_group in self.0.iter() {
@ -729,12 +708,78 @@ impl std::fmt::Display for DisplayReadAggregateResults<'_> {
#[cfg(test)]
mod test {
use row_group::ColumnMeta;
use super::*;
use crate::column::Column;
use crate::column::{self, Column};
use crate::row_group::{BinaryExpr, ColumnType, ReadAggregateResult};
use crate::schema;
use crate::schema::LogicalDataType;
#[test]
fn meta_data_update() {
let rg_meta = row_group::MetaData {
size: 100,
rows: 2000,
columns: vec![(
"region".to_owned(),
ColumnMeta {
typ: schema::ColumnType::Tag("region".to_owned()),
logical_data_type: schema::LogicalDataType::String,
range: (
column::OwnedValue::String("north".to_owned()),
column::OwnedValue::String("south".to_owned()),
),
},
)]
.into_iter()
.collect::<BTreeMap<_, _>>(),
time_range: (10, 3000),
};
let mut meta = MetaData::new(&rg_meta);
assert_eq!(meta.rows, 2000);
assert_eq!(meta.size, 100);
assert_eq!(meta.time_range, Some((10, 3000)));
assert_eq!(
meta.columns.get("region").unwrap().range,
(
column::OwnedValue::String("north".to_owned()),
column::OwnedValue::String("south".to_owned())
)
);
meta.update(&row_group::MetaData {
size: 300,
rows: 1500,
columns: vec![(
"region".to_owned(),
ColumnMeta {
typ: schema::ColumnType::Tag("region".to_owned()),
logical_data_type: schema::LogicalDataType::String,
range: (
column::OwnedValue::String("east".to_owned()),
column::OwnedValue::String("north".to_owned()),
),
},
)]
.into_iter()
.collect::<BTreeMap<_, _>>(),
time_range: (10, 3500),
});
assert_eq!(meta.rows, 3500);
assert_eq!(meta.size, 400);
assert_eq!(meta.time_range, Some((10, 3500)));
assert_eq!(
meta.columns.get("region").unwrap().range,
(
column::OwnedValue::String("east".to_owned()),
column::OwnedValue::String("south".to_owned())
)
);
}
#[test]
fn select() {
// Build first segment.
@ -754,13 +799,21 @@ mod test {
let mut table = Table::new("cpu".to_owned(), rg);
let exp_col_types = vec![
("region".to_owned(), LogicalDataType::String),
("count".to_owned(), LogicalDataType::Unsigned),
("time".to_owned(), LogicalDataType::Integer),
("region", LogicalDataType::String),
("count", LogicalDataType::Unsigned),
("time", LogicalDataType::Integer),
]
.into_iter()
.collect::<BTreeMap<_, _>>();
assert_eq!(table.column_logical_types(), &exp_col_types);
assert_eq!(
table
.meta
.columns
.iter()
.map(|(k, v)| (k.as_str(), v.logical_data_type))
.collect::<BTreeMap<_, _>>(),
exp_col_types
);
// Build another segment.
let mut columns = BTreeMap::new();