feat: make Table concurrent-safe

pull/24376/head
Edd Robinson 2021-01-28 20:31:00 +00:00
parent 050185ad92
commit 30b90943bc
1 changed files with 370 additions and 287 deletions

View File

@ -15,7 +15,7 @@ use crate::schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema};
// Tie data and meta-data together so that they can be wrapped in RWLock. // Tie data and meta-data together so that they can be wrapped in RWLock.
struct RowGroupData { struct RowGroupData {
meta: MetaData, meta: Rc<MetaData>,
data: Vec<Rc<RowGroup>>, data: Vec<Rc<RowGroup>>,
} }
@ -38,10 +38,23 @@ struct RowGroupData {
pub struct Table { pub struct Table {
name: String, name: String,
// Metadata about the table's segments // A table's data is held in a collection of immutable row groups and
meta: MetaData, // mutable meta data (`RowGroupData`).
//
row_groups: RwLock<RowGroupData>, // Concurrent access to the `RowGroupData` is managed via an `RwLock`, which is
// taken in the following circumstances:
//
// * A lock is needed when adding a new row group. It is held as long as it takes to push
// the new row group on a `Vec` and update the table meta-data. This is not long.
//
// * A lock is needed when removing row groups. It is held as long as it takes to remove
// something from a `Vec`, and re-construct new meta-data. This is not long.
//
// * A read lock is needed for all read operations over table data (row groups). However,
// the read lock is only held for as long as it takes to shallow-clone the table data (via
// Rcs) that are required for the read. The expensive process of performing the read
// operation is done in a lock-free manner.
table_data: RwLock<RowGroupData>,
} }
impl Table { impl Table {
@ -49,9 +62,8 @@ impl Table {
pub fn new(name: impl Into<String>, rg: RowGroup) -> Self { pub fn new(name: impl Into<String>, rg: RowGroup) -> Self {
Self { Self {
name: name.into(), name: name.into(),
meta: MetaData::new(rg.metadata()), table_data: RwLock::new(RowGroupData {
row_groups: RwLock::new(RowGroupData { meta: Rc::new(MetaData::new(rg.metadata())),
meta: MetaData::new(rg.metadata()),
data: vec![Rc::new(rg)], data: vec![Rc::new(rg)],
}), }),
} }
@ -59,18 +71,23 @@ impl Table {
/// Add a new row group to this table. /// Add a new row group to this table.
pub fn add_row_group(&mut self, rg: RowGroup) { pub fn add_row_group(&mut self, rg: RowGroup) {
let mut row_groups = self.row_groups.write().unwrap(); let mut row_groups = self.table_data.write().unwrap();
self.meta.update(rg.metadata()); // `meta` can't be modified whilst protected by an Rc so create a new one.
row_groups.meta = Rc::new(MetaData::update_with(
MetaData::clone(&row_groups.meta), // clone meta-data not Rc
rg.metadata(),
));
// Add the new row group data to the table.
row_groups.data.push(Rc::new(rg)); row_groups.data.push(Rc::new(rg));
} }
/// Remove the row group at `position` from table. /// Remove the row group at `position` from table.
pub fn drop_row_group(&mut self, position: usize) { pub fn drop_row_group(&mut self, position: usize) {
let mut row_groups = self.row_groups.write().unwrap(); let mut row_groups = self.table_data.write().unwrap();
row_groups.data.remove(position); // removes row group data row_groups.data.remove(position); // removes row group data
row_groups.meta = MetaData::from(&row_groups.data); // rebuild table row_groups.meta = Rc::new(MetaData::from(&row_groups.data)); // rebuild meta
// meta data
} }
/// The name of the table (equivalent to measurement or table name). /// The name of the table (equivalent to measurement or table name).
@ -80,44 +97,56 @@ impl Table {
/// Determines if this table contains no row groups. /// Determines if this table contains no row groups.
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.row_groups.read().unwrap().data.is_empty() self.table_data.read().unwrap().data.is_empty()
} }
/// The total number of row groups within this table. /// The total number of row groups within this table.
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
self.row_groups.read().unwrap().data.len() self.table_data.read().unwrap().data.len()
} }
/// The total size of the table in bytes. /// The total size of the table in bytes.
pub fn size(&self) -> u64 { pub fn size(&self) -> u64 {
self.meta.size self.table_data.read().unwrap().meta.size
} }
/// The number of rows in this table. /// The number of rows in this table.
pub fn rows(&self) -> u64 { pub fn rows(&self) -> u64 {
self.meta.rows self.table_data.read().unwrap().meta.rows
} }
/// The time range of all row groups within this table. /// The time range of all row groups within this table.
pub fn time_range(&self) -> Option<(i64, i64)> { pub fn time_range(&self) -> Option<(i64, i64)> {
self.meta.time_range self.table_data.read().unwrap().meta.time_range
}
// Returns an immutable reference to the table's current meta data.
fn meta(&self) -> Rc<MetaData> {
Rc::clone(&self.table_data.read().unwrap().meta)
} }
// Identify set of row groups that might satisfy the predicate. // Identify set of row groups that might satisfy the predicate.
fn filter_row_groups(&self, predicate: &Predicate) -> Vec<Rc<RowGroup>> { //
let mut rgs = Vec::with_capacity(self.len()); // Produce a set of these row groups along with a snapshot of the table meta
// data associated with them.
//
// N.B the table read lock is only held as long as it takes to determine
// with meta data whether each row group may satisfy the predicate.
fn filter_row_groups(&self, predicate: &Predicate) -> (Rc<MetaData>, Vec<Rc<RowGroup>>) {
let table_data = self.table_data.read().unwrap();
let mut row_groups = Vec::with_capacity(table_data.data.len());
'rowgroup: for rg in self.row_groups.read().unwrap().data.iter() { 'rowgroup: for rg in table_data.data.iter() {
// check all expressions in predicate // check all expressions in predicate
if !rg.could_satisfy_conjunctive_binary_expressions(predicate.iter()) { if !rg.could_satisfy_conjunctive_binary_expressions(predicate.iter()) {
continue 'rowgroup; continue 'rowgroup;
} }
// row group could potentially satisfy predicate // row group could potentially satisfy predicate
rgs.push(Rc::clone(rg)); row_groups.push(Rc::clone(&rg));
} }
rgs (Rc::clone(&table_data.meta), row_groups)
} }
/// Select data for the specified column selections with the provided /// Select data for the specified column selections with the provided
@ -135,23 +164,22 @@ impl Table {
predicate: &Predicate, predicate: &Predicate,
) -> ReadFilterResults { ) -> ReadFilterResults {
// identify row groups where time range and predicates match could match // identify row groups where time range and predicates match could match
// using row group meta data, and then execute against those row groups // the predicate. Get a snapshot of those and the meta-data.
// and merge results. let (meta, row_groups) = self.filter_row_groups(predicate);
let rgs = self.filter_row_groups(predicate);
let schema = ResultSchema { let schema = ResultSchema {
select_columns: match columns { select_columns: match columns {
Selection::All => self.meta.schema_for_all_columns(), Selection::All => meta.schema_for_all_columns(),
Selection::Some(column_names) => self.meta.schema_for_column_names(column_names), Selection::Some(column_names) => meta.schema_for_column_names(column_names),
}, },
..ResultSchema::default() ..ResultSchema::default()
}; };
// temp I think I can remove `predicates` from the results // TODO(edd): I think I can remove `predicates` from the results
ReadFilterResults { ReadFilterResults {
predicate: predicate.clone(), predicate: predicate.clone(),
schema, schema,
row_groups: rgs, row_groups,
} }
} }
@ -170,18 +198,18 @@ impl Table {
group_columns: &'input Selection<'_>, group_columns: &'input Selection<'_>,
aggregates: &'input [(ColumnName<'input>, AggregateType)], aggregates: &'input [(ColumnName<'input>, AggregateType)],
) -> ReadAggregateResults { ) -> ReadAggregateResults {
let (meta, row_groups) = self.filter_row_groups(&predicate);
// Filter out any column names that we do not have data for. // Filter out any column names that we do not have data for.
let schema = ResultSchema { let schema = ResultSchema {
group_columns: match group_columns { group_columns: match group_columns {
Selection::All => self.meta.schema_for_all_columns(), Selection::All => meta.schema_for_all_columns(),
Selection::Some(column_names) => self.meta.schema_for_column_names(column_names), Selection::Some(column_names) => meta.schema_for_column_names(column_names),
}, },
aggregate_columns: self.meta.schema_for_aggregate_column_names(aggregates), aggregate_columns: meta.schema_for_aggregate_column_names(aggregates),
..ResultSchema::default() ..ResultSchema::default()
}; };
let row_groups = self.filter_row_groups(&predicate);
// return the iterator to build the results. // return the iterator to build the results.
ReadAggregateResults { ReadAggregateResults {
schema, schema,
@ -418,11 +446,18 @@ impl Table {
/// predicate, whilst `true` indicates one or more rows *might* match the /// predicate, whilst `true` indicates one or more rows *might* match the
/// predicate. /// predicate.
fn could_satisfy_predicate(&self, predicate: &Predicate) -> bool { fn could_satisfy_predicate(&self, predicate: &Predicate) -> bool {
// Get a snapshot of the table data under a read lock.
let (meta, row_groups) = {
let table_data = self.table_data.read().unwrap();
// TODO(edd): assuming `to_vec` calls Rc::clone?
(Rc::clone(&table_data.meta), table_data.data.to_vec())
};
// if the table doesn't have a column for one of the predicate's // if the table doesn't have a column for one of the predicate's
// expressions then the table cannot satisfy the predicate. // expressions then the table cannot satisfy the predicate.
if !predicate if !predicate
.iter() .iter()
.all(|expr| self.meta.columns.contains_key(expr.column())) .all(|expr| meta.columns.contains_key(expr.column()))
{ {
return false; return false;
} }
@ -431,10 +466,7 @@ impl Table {
// predicate then the table itself could satisfy the predicate so return // predicate then the table itself could satisfy the predicate so return
// true. If none of the row groups could match then return false. // true. If none of the row groups could match then return false.
let exprs = predicate.expressions(); let exprs = predicate.expressions();
self.row_groups row_groups
.read()
.unwrap()
.data
.iter() .iter()
.any(|row_group| row_group.could_satisfy_conjunctive_binary_expressions(exprs)) .any(|row_group| row_group.could_satisfy_conjunctive_binary_expressions(exprs))
} }
@ -442,27 +474,34 @@ impl Table {
/// Determines if this table contains one or more rows that satisfy the /// Determines if this table contains one or more rows that satisfy the
/// predicate. /// predicate.
pub fn satisfies_predicate(&self, predicate: &Predicate) -> bool { pub fn satisfies_predicate(&self, predicate: &Predicate) -> bool {
// Get a snapshot of the table data under a read lock.
let (meta, row_groups) = {
let table_data = self.table_data.read().unwrap();
(Rc::clone(&table_data.meta), table_data.data.to_vec())
};
// if the table doesn't have a column for one of the predicate's // if the table doesn't have a column for one of the predicate's
// expressions then the table cannot satisfy the predicate. // expressions then the table cannot satisfy the predicate.
if !predicate if !predicate
.iter() .iter()
.all(|expr| self.meta.columns.contains_key(expr.column())) .all(|expr| meta.columns.contains_key(expr.column()))
{ {
return false; return false;
} }
// apply the predicate to all row groups. Each row group will do its own // Apply the predicate to all row groups. Each row group will do its own
// column pruning based on its column ranges. // column pruning based on its column ranges.
self.row_groups
.read() // The following could be expensive if row group data needs to be
.unwrap() // processed but this operation is now lock-free.
.data row_groups
.iter() .iter()
.any(|row_group| row_group.satisfies_predicate(predicate)) .any(|row_group| row_group.satisfies_predicate(predicate))
} }
} }
// TODO(edd): reduce owned strings here by, e.g., using references as keys. // TODO(edd): reduce owned strings here by, e.g., using references as keys.
#[derive(Clone)]
struct MetaData { struct MetaData {
// The total size of the table in bytes. // The total size of the table in bytes.
size: u64, size: u64,
@ -496,6 +535,50 @@ impl MetaData {
} }
} }
/// Create a new `MetaData` by consuming `this` and incorporating `other`.
pub fn update_with(mut this: Self, other: &row_group::MetaData) -> Self {
// The incoming row group must have exactly the same schema as the
// existing row groups in the table.
assert_eq!(&this.columns, &other.columns);
// update size, rows, column ranges, time range
this.size += other.size;
this.rows += other.rows as u64;
// The incoming row group must have exactly the same schema as the
// existing row groups in the table.
assert_eq!(&this.columns, &other.columns);
// Update the table schema using the incoming row group schema
for (column_name, column_meta) in &other.columns {
let (column_range_min, column_range_max) = &column_meta.range;
let mut curr_range = &mut this
.columns
.get_mut(&column_name.to_string())
.unwrap()
.range;
if column_range_min < &curr_range.0 {
curr_range.0 = column_range_min.clone();
}
if column_range_max > &curr_range.1 {
curr_range.1 = column_range_max.clone();
}
match this.time_range {
Some(time_range) => {
this.time_range = Some((
time_range.0.min(other.time_range.0),
time_range.1.max(other.time_range.1),
));
}
None => panic!("cannot call `update` on empty Metadata"),
}
}
this
}
// Extract schema information for a set of columns. If a column name does // Extract schema information for a set of columns. If a column name does
// not exist within the `Table` schema it is ignored and not present within // not exist within the `Table` schema it is ignored and not present within
// the resulting schema information. // the resulting schema information.
@ -538,43 +621,6 @@ impl MetaData {
pub fn all_column_names(&self) -> Vec<&str> { pub fn all_column_names(&self) -> Vec<&str> {
self.column_names.iter().map(|name| name.as_str()).collect() self.column_names.iter().map(|name| name.as_str()).collect()
} }
pub fn update(&mut self, meta: &row_group::MetaData) {
// update size, rows, column ranges, time range
self.size += meta.size;
self.rows += meta.rows as u64;
// The incoming row group must have exactly the same schema as the
// existing row groups in the table.
assert_eq!(&self.columns, &meta.columns);
// Update the table schema using the incoming row group schema
for (column_name, column_meta) in &meta.columns {
let (column_range_min, column_range_max) = &column_meta.range;
let mut curr_range = &mut self
.columns
.get_mut(&column_name.to_string())
.unwrap()
.range;
if column_range_min < &curr_range.0 {
curr_range.0 = column_range_min.clone();
}
if column_range_max > &curr_range.1 {
curr_range.1 = column_range_max.clone();
}
match self.time_range {
Some(time_range) => {
self.time_range = Some((
time_range.0.min(meta.time_range.0),
time_range.1.max(meta.time_range.1),
));
}
None => panic!("cannot call `update` on empty Metadata"),
}
}
}
} }
impl From<&Vec<Rc<RowGroup>>> for MetaData { impl From<&Vec<Rc<RowGroup>>> for MetaData {
@ -607,6 +653,28 @@ impl ReadFilterResults {
pub fn schema(&self) -> &ResultSchema { pub fn schema(&self) -> &ResultSchema {
&self.schema &self.schema
} }
// useful for testing - materialise all results but don't convert them to
// record batches. Skips any row groups that don't have any results
fn row_group_results(&self) -> Vec<row_group::ReadFilterResult<'_>> {
let select_columns = &self
.schema()
.select_column_names_iter()
.map(|name| name.as_str())
.collect::<Vec<_>>();
self.row_groups
.iter()
.filter_map(|row_group| {
let result = row_group.read_filter(select_columns, &self.predicate);
if result.is_empty() {
None
} else {
Some(result)
}
})
.collect::<Vec<_>>()
}
} }
impl Iterator for ReadFilterResults { impl Iterator for ReadFilterResults {
@ -679,22 +747,11 @@ impl ReadAggregateResults {
pub fn schema(&self) -> &ResultSchema { pub fn schema(&self) -> &ResultSchema {
&self.schema &self.schema
} }
}
/// Implements an iterator on the Table's results for `read_aggregate`. This // Logic to get next result merged across all row groups for the table is
/// iterator will execute against one or more row groups, merging each row group // pulled out so we can decouple this from materialising record batches,
/// result into the last before returning a final set of results. // which means we're not forced to use record batches in tests.
/// fn next_merged_result(&mut self) -> Option<row_group::ReadAggregateResult<'_>> {
/// Merging in this context means unioning all group keys in multiple sets of
/// results, and aggregating together aggregates for duplicate group keys.
///
/// Given that, it's expected that this iterator will only iterate once, but
/// perhaps in the future we will break the work up and send intermediate
/// results back.
impl Iterator for ReadAggregateResults {
type Item = RecordBatch;
fn next(&mut self) -> Option<Self::Item> {
if self.row_groups.is_empty() || self.drained { if self.row_groups.is_empty() || self.drained {
return None; return None;
} }
@ -743,7 +800,26 @@ impl Iterator for ReadAggregateResults {
} }
self.drained = true; self.drained = true;
Some(merged_results.try_into().unwrap()) Some(merged_results)
}
}
/// Implements an iterator on the Table's results for `read_aggregate`. This
/// iterator will execute against one or more row groups, merging each row group
/// result into the last before returning a final set of results.
///
/// Merging in this context means unioning all group keys in multiple sets of
/// results, and aggregating together aggregates for duplicate group keys.
///
/// Given that, it's expected that this iterator will only iterate once, but
/// perhaps in the future we will break the work up and send intermediate
/// results back.
impl Iterator for ReadAggregateResults {
type Item = RecordBatch;
fn next(&mut self) -> Option<Self::Item> {
self.next_merged_result()
.map(|merged_result| merged_result.try_into().unwrap())
} }
} }
@ -770,16 +846,17 @@ impl std::fmt::Display for DisplayReadAggregateResults<'_> {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*;
use row_group::ColumnMeta; use row_group::ColumnMeta;
use super::*;
use crate::column::{self, Column}; use crate::column::{self, Column};
use crate::row_group::{BinaryExpr, ColumnType, ReadAggregateResult}; use crate::row_group::{BinaryExpr, ColumnType, ReadAggregateResult};
use crate::schema; use crate::schema;
use crate::schema::LogicalDataType; use crate::schema::LogicalDataType;
#[test] #[test]
fn meta_data_update() { fn meta_data_update_with() {
let rg_meta = row_group::MetaData { let rg_meta = row_group::MetaData {
size: 100, size: 100,
rows: 2000, rows: 2000,
@ -811,24 +888,27 @@ mod test {
) )
); );
meta.update(&row_group::MetaData { meta = MetaData::update_with(
size: 300, meta,
rows: 1500, &row_group::MetaData {
columns: vec![( size: 300,
"region".to_owned(), rows: 1500,
ColumnMeta { columns: vec![(
typ: schema::ColumnType::Tag("region".to_owned()), "region".to_owned(),
logical_data_type: schema::LogicalDataType::String, ColumnMeta {
range: ( typ: schema::ColumnType::Tag("region".to_owned()),
column::OwnedValue::String("east".to_owned()), logical_data_type: schema::LogicalDataType::String,
column::OwnedValue::String("north".to_owned()), range: (
), column::OwnedValue::String("east".to_owned()),
}, column::OwnedValue::String("north".to_owned()),
)] ),
.into_iter() },
.collect::<BTreeMap<_, _>>(), )]
time_range: (10, 3500), .into_iter()
}); .collect::<BTreeMap<_, _>>(),
time_range: (10, 3500),
},
);
assert_eq!(meta.rows, 3500); assert_eq!(meta.rows, 3500);
assert_eq!(meta.size, 400); assert_eq!(meta.size, 400);
@ -844,190 +924,193 @@ mod test {
#[test] #[test]
fn select() { fn select() {
// // Build first segment. // Build first row group.
// let mut columns = BTreeMap::new(); let mut columns = BTreeMap::new();
// let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6][..]));
// 6][..])); columns.insert("time".to_string(), tc); columns.insert("time".to_string(), tc);
// let rc = ColumnType::Tag(Column::from( let rc = ColumnType::Tag(Column::from(
// &["west", "west", "east", "west", "south", "north"][..], &["west", "west", "east", "west", "south", "north"][..],
// )); ));
// columns.insert("region".to_string(), rc); columns.insert("region".to_string(), rc);
// let fc = ColumnType::Field(Column::from(&[100_u64, 101, 200, let fc = ColumnType::Field(Column::from(&[100_u64, 101, 200, 203, 203, 10][..]));
// 203, 203, 10][..])); columns.insert("count". columns.insert("count".to_string(), fc);
// to_string(), fc);
// let rg = RowGroup::new(6, columns); let rg = RowGroup::new(6, columns);
// let mut table = Table::new("cpu".to_owned(), rg); let mut table = Table::new("cpu".to_owned(), rg);
// let exp_col_types = vec![ let exp_col_types = vec![
// ("region", LogicalDataType::String), ("region", LogicalDataType::String),
// ("count", LogicalDataType::Unsigned), ("count", LogicalDataType::Unsigned),
// ("time", LogicalDataType::Integer), ("time", LogicalDataType::Integer),
// ] ]
// .into_iter() .into_iter()
// .collect::<BTreeMap<_, _>>(); .collect::<BTreeMap<_, _>>();
// assert_eq!( assert_eq!(
// table table
// .meta .meta()
// .columns .columns
// .iter() .iter()
// .map(|(k, v)| (k.as_str(), v.logical_data_type)) .map(|(k, v)| (k.as_str(), v.logical_data_type))
// .collect::<BTreeMap<_, _>>(), .collect::<BTreeMap<_, _>>(),
// exp_col_types exp_col_types
// ); );
// // Build another segment. // Build another row group.
// let mut columns = BTreeMap::new(); let mut columns = BTreeMap::new();
// let tc = ColumnType::Time(Column::from(&[10_i64, 20, let tc = ColumnType::Time(Column::from(&[10_i64, 20, 30][..]));
// 30][..])); columns.insert("time".to_string(), tc); columns.insert("time".to_string(), tc);
// let rc = ColumnType::Tag(Column::from(&["south", "north", let rc = ColumnType::Tag(Column::from(&["south", "north", "east"][..]));
// "east"][..])); columns.insert("region".to_string(), columns.insert("region".to_string(), rc);
// rc); let fc = let fc = ColumnType::Field(Column::from(&[1000_u64, 1002, 1200][..]));
// ColumnType::Field(Column::from(&[1000_u64, 1002, 1200][..])); columns.insert("count".to_string(), fc);
// columns.insert("count".to_string(), fc); let row_group = RowGroup::new(3, columns);
// let segment = RowGroup::new(3, columns); table.add_row_group(row_group);
// table.add_row_group(segment);
// // Get all the results // Get all the results
// let predicate = Predicate::with_time_range(&[], 1, 31); let predicate = Predicate::with_time_range(&[], 1, 31);
// let results = table.read_filter( let results = table.read_filter(&Selection::Some(&["time", "count", "region"]), &predicate);
// &ColumnSelection::Some(&["time", "count", "region"]),
// &predicate,
// );
// // check the column types // check the column types
// let exp_schema = ResultSchema { let exp_schema = ResultSchema {
// select_columns: vec![ select_columns: vec![
// ( (
// schema::ColumnType::Timestamp("time".to_owned()), schema::ColumnType::Timestamp("time".to_owned()),
// LogicalDataType::Integer, LogicalDataType::Integer,
// ), ),
// ( (
// schema::ColumnType::Field("count".to_owned()), schema::ColumnType::Field("count".to_owned()),
// LogicalDataType::Unsigned, LogicalDataType::Unsigned,
// ), ),
// ( (
// schema::ColumnType::Tag("region".to_owned()), schema::ColumnType::Tag("region".to_owned()),
// LogicalDataType::String, LogicalDataType::String,
// ), ),
// ], ],
// ..ResultSchema::default() ..ResultSchema::default()
// }; };
assert_eq!(results.schema(), &exp_schema);
// assert_eq!(results.schema(), &exp_schema); let results = results.row_group_results();
for result in &results {
assert_eq!(result.schema(), &exp_schema);
}
// let mut all = vec![]; assert_eq!(
// for result in results { format!("{}", DisplayReadFilterResults(results)),
// assert_eq!(result.schema(), &exp_schema); "time,count,region
// all.push(result); 1,100,west
// } 2,101,west
3,200,east
4,203,west
5,203,south
6,10,north
10,1000,south
20,1002,north
30,1200,east
",
);
// assert_eq!( let predicate =
// format!("{}", DisplayReadFilterResults(all)), Predicate::with_time_range(&[BinaryExpr::from(("region", "!=", "south"))], 1, 25);
// "time,count,region
// 1,100,west
// 2,101,west
// 3,200,east
// 4,203,west
// 5,203,south
// 6,10,north
// 10,1000,south
// 20,1002,north
// 30,1200,east
// ",
// );
// let predicate = // Apply a predicate `WHERE "region" != "south"`
// Predicate::with_time_range(&[BinaryExpr::from(("region", let results = table.read_filter(&Selection::Some(&["time", "region"]), &predicate);
// "!=", "south"))], 1, 25);
// // Apply a predicate `WHERE "region" != "south"` let exp_schema = ResultSchema {
// let results = select_columns: vec![
// table.read_filter(&ColumnSelection::Some(&["time", "region"]), (
// &predicate); schema::ColumnType::Timestamp("time".to_owned()),
LogicalDataType::Integer,
),
(
schema::ColumnType::Tag("region".to_owned()),
LogicalDataType::String,
),
],
..ResultSchema::default()
};
// let mut all = vec![]; let results = results.row_group_results();
// for result in results { for result in &results {
// all.push(result); assert_eq!(result.schema(), &exp_schema);
// } }
// assert_eq!( assert_eq!(
// format!("{}", DisplayReadFilterResults(all)), format!("{}", DisplayReadFilterResults(results)),
// "time,region "time,region
// 1,west 1,west
// 2,west 2,west
// 3,east 3,east
// 4,west 4,west
// 6,north 6,north
// 20,north 20,north
// ", ",
// ); );
// } }
// #[test] #[test]
// fn read_group_result() { fn read_group_result() {
// let mut result_a = ReadAggregateResult { let mut result_a = ReadAggregateResult {
// schema: ResultSchema { schema: ResultSchema {
// select_columns: vec![], select_columns: vec![],
// group_columns: vec![ group_columns: vec![
// ( (
// schema::ColumnType::Tag("region".to_owned()), schema::ColumnType::Tag("region".to_owned()),
// LogicalDataType::String, LogicalDataType::String,
// ), ),
// ( (
// schema::ColumnType::Tag("host".to_owned()), schema::ColumnType::Tag("host".to_owned()),
// LogicalDataType::String, LogicalDataType::String,
// ), ),
// ], ],
// aggregate_columns: vec![( aggregate_columns: vec![(
// schema::ColumnType::Tag("temp".to_owned()), schema::ColumnType::Tag("temp".to_owned()),
// AggregateType::Sum, AggregateType::Sum,
// LogicalDataType::Integer, LogicalDataType::Integer,
// )], )],
// }, },
// ..ReadAggregateResult::default() ..ReadAggregateResult::default()
// }; };
// result_a.add_row( result_a.add_row(
// vec![Value::String("east"), Value::String("host-a")], vec![Value::String("east"), Value::String("host-a")],
// vec![AggregateResult::Sum(Scalar::I64(10))], vec![AggregateResult::Sum(Scalar::I64(10))],
// ); );
// let mut result_b = ReadAggregateResult { let mut result_b = ReadAggregateResult {
// schema: ResultSchema { schema: ResultSchema {
// select_columns: vec![], select_columns: vec![],
// group_columns: vec![ group_columns: vec![
// ( (
// schema::ColumnType::Tag("region".to_owned()), schema::ColumnType::Tag("region".to_owned()),
// LogicalDataType::String, LogicalDataType::String,
// ), ),
// ( (
// schema::ColumnType::Tag("host".to_owned()), schema::ColumnType::Tag("host".to_owned()),
// LogicalDataType::String, LogicalDataType::String,
// ), ),
// ], ],
// aggregate_columns: vec![( aggregate_columns: vec![(
// schema::ColumnType::Tag("temp".to_owned()), schema::ColumnType::Tag("temp".to_owned()),
// AggregateType::Sum, AggregateType::Sum,
// LogicalDataType::Integer, LogicalDataType::Integer,
// )], )],
// }, },
// ..Default::default() ..Default::default()
// }; };
// result_b.add_row( result_b.add_row(
// vec![Value::String("west"), Value::String("host-b")], vec![Value::String("west"), Value::String("host-b")],
// vec![AggregateResult::Sum(Scalar::I64(100))], vec![AggregateResult::Sum(Scalar::I64(100))],
// ); );
// let results = DisplayReadAggregateResults(vec![result_a, let results = DisplayReadAggregateResults(vec![result_a, result_b]); //Display implementation
// result_b]); //Display implementation assert_eq!(
// assert_eq!( format!("{}", &results),
// format!("{}", &results), "region,host,temp_sum
// "region,host,temp_sum east,host-a,10
// east,host-a,10 west,host-b,100
// west,host-b,100 "
// " );
// );
} }
} }