feat: validate predicates on read_filter

pull/24376/head
Edd Robinson 2021-09-24 14:21:30 +01:00
parent 053186ab29
commit 621b26166c
3 changed files with 53 additions and 16 deletions

View File

@ -183,9 +183,10 @@ impl Chunk {
predicate: Predicate,
select_columns: Selection<'_>,
negated_predicates: Vec<Predicate>,
) -> table::ReadFilterResults {
) -> Result<table::ReadFilterResults> {
self.table
.read_filter(&select_columns, &predicate, negated_predicates.as_slice())
.context(TableError)
}
/// Returns an iterable collection of data in group columns and aggregate
@ -1024,7 +1025,9 @@ mod test {
let predicate =
Predicate::with_time_range(&[BinaryExpr::from(("env", "=", "us-west"))], 100, 205); // filter on time
let mut itr = chunk.read_filter(predicate, Selection::All, vec![]);
let mut itr = chunk
.read_filter(predicate, Selection::All, vec![])
.unwrap();
let exp_env_values = Values::Dictionary(vec![0], vec![Some("us-west")]);
let exp_region_values = Values::Dictionary(vec![0], vec![Some("west")]);
@ -1058,6 +1061,13 @@ mod test {
assert_rb_column_equals(&first_row_group, "active", &exp_active_values);
assert_rb_column_equals(&second_row_group, "time", &Values::I64(vec![200])); // first row from second record batch
// Error when predicate is invalid
let predicate =
Predicate::with_time_range(&[BinaryExpr::from(("env", "=", 22.3))], 100, 205);
assert!(chunk
.read_filter(predicate, Selection::All, vec![])
.is_err());
// No more data
assert!(itr.next().is_none());
}
@ -1079,7 +1089,9 @@ mod test {
let delete_predicates = vec![Predicate::new(vec![BinaryExpr::from((
"region", "=", "west",
))])];
let mut itr = chunk.read_filter(predicate, Selection::All, delete_predicates);
let mut itr = chunk
.read_filter(predicate, Selection::All, delete_predicates)
.unwrap();
let exp_env_values = Values::Dictionary(vec![0], vec![Some("us-west")]);
let exp_region_values = Values::Dictionary(vec![0], vec![Some("east")]);
@ -1127,6 +1139,16 @@ mod test {
// No more data
assert!(itr.next().is_none());
// Error when one of the negated predicates is invalid
let predicate = Predicate::new(vec![BinaryExpr::from(("env", "=", "us-west"))]);
let delete_predicates = vec![
Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]),
Predicate::new(vec![BinaryExpr::from(("time", "=", "not a number"))]),
];
assert!(chunk
.read_filter(predicate, Selection::All, delete_predicates)
.is_err());
}
#[test]

View File

@ -3,10 +3,10 @@ use crate::{
row_group::{self, ColumnName, Literal, Predicate, RowGroup},
schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema},
value::{OwnedValue, Scalar, Value},
BinaryExpr,
};
use arrow::record_batch::RecordBatch;
use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary};
use datafusion::physical_plan::expressions::BinaryExpr;
use internal_types::selection::Selection;
use parking_lot::RwLock;
use snafu::{ensure, Snafu};
@ -247,12 +247,14 @@ impl Table {
// Identify set of row groups that might satisfy the predicate.
//
// Produce a set of these row groups along with a snapshot of the table meta
// data associated with them.
// data associated with them. Returns an error if the provided predicate
// cannot be applied to the row groups with respect to the schema.
//
// N.B the table read lock is only held as long as it takes to determine
// with meta data whether each row group may satisfy the predicate.
fn filter_row_groups(&self, predicate: &Predicate) -> (Arc<MetaData>, Vec<Arc<RowGroup>>) {
let table_data = self.table_data.read();
let mut row_groups = Vec::with_capacity(table_data.data.len());
'rowgroup: for rg in table_data.data.iter() {
@ -282,10 +284,15 @@ impl Table {
columns: &Selection<'_>,
predicate: &Predicate,
negated_predicates: &[Predicate],
) -> ReadFilterResults {
) -> Result<ReadFilterResults> {
// identify row groups where time range and predicates match could match
// the predicate. Get a snapshot of those and the meta-data.
// the predicate. Get a snapshot of those and the meta-data. Finally,
// validate that the predicate can be applied to the row groups.
let (meta, row_groups) = self.filter_row_groups(predicate);
meta.validate_exprs(predicate.iter())?;
for pred in negated_predicates {
meta.validate_exprs(pred.iter())?;
}
let schema = ResultSchema {
select_columns: match columns {
@ -296,12 +303,12 @@ impl Table {
};
// TODO(edd): I think I can remove `predicates` from the results
ReadFilterResults {
Ok(ReadFilterResults {
predicate: predicate.clone(),
negated_predicates: negated_predicates.to_vec(),
schema,
row_groups,
}
})
}
/// Returns an iterable collection of data in group columns and aggregate
@ -1461,11 +1468,13 @@ mod test {
// Get all the results
let predicate = Predicate::with_time_range(&[], 1, 31);
let results = table.read_filter(
&Selection::Some(&["time", "count", "region"]),
&predicate,
&[],
);
let results = table
.read_filter(
&Selection::Some(&["time", "count", "region"]),
&predicate,
&[],
)
.unwrap();
// check the column types
let exp_schema = ResultSchema {
@ -1511,7 +1520,9 @@ mod test {
Predicate::with_time_range(&[BinaryExpr::from(("region", "!=", "south"))], 1, 25);
// Apply a predicate `WHERE "region" != "south"`
let results = table.read_filter(&Selection::Some(&["time", "region"]), &predicate, &[]);
let results = table
.read_filter(&Selection::Some(&["time", "region"]), &predicate, &[])
.unwrap();
let exp_schema = ResultSchema {
select_columns: vec![

View File

@ -374,7 +374,11 @@ impl QueryChunk for DbChunk {
"Negated Predicate pushed down to RUB"
);
let read_results = chunk.read_filter(rb_predicate, selection, negated_delete_exprs);
let read_results = chunk
.read_filter(rb_predicate, selection, negated_delete_exprs)
.context(ReadBufferChunkError {
chunk_id: self.id(),
})?;
let schema =
chunk
.read_filter_table_schema(selection)