From 621b26166c830076fcc7907b0c3b05af0fb13212 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Fri, 24 Sep 2021 14:21:30 +0100 Subject: [PATCH] feat: validate predicates on read_filter --- read_buffer/src/chunk.rs | 28 +++++++++++++++++++++++++--- read_buffer/src/table.rs | 35 +++++++++++++++++++++++------------ server/src/db/chunk.rs | 6 +++++- 3 files changed, 53 insertions(+), 16 deletions(-) diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index 309c4c0d12..93f96624d9 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -183,9 +183,10 @@ impl Chunk { predicate: Predicate, select_columns: Selection<'_>, negated_predicates: Vec, - ) -> table::ReadFilterResults { + ) -> Result { self.table .read_filter(&select_columns, &predicate, negated_predicates.as_slice()) + .context(TableError) } /// Returns an iterable collection of data in group columns and aggregate @@ -1024,7 +1025,9 @@ mod test { let predicate = Predicate::with_time_range(&[BinaryExpr::from(("env", "=", "us-west"))], 100, 205); // filter on time - let mut itr = chunk.read_filter(predicate, Selection::All, vec![]); + let mut itr = chunk + .read_filter(predicate, Selection::All, vec![]) + .unwrap(); let exp_env_values = Values::Dictionary(vec![0], vec![Some("us-west")]); let exp_region_values = Values::Dictionary(vec![0], vec![Some("west")]); @@ -1058,6 +1061,13 @@ mod test { assert_rb_column_equals(&first_row_group, "active", &exp_active_values); assert_rb_column_equals(&second_row_group, "time", &Values::I64(vec![200])); // first row from second record batch + // Error when predicate is invalid + let predicate = + Predicate::with_time_range(&[BinaryExpr::from(("env", "=", 22.3))], 100, 205); + assert!(chunk + .read_filter(predicate, Selection::All, vec![]) + .is_err()); + // No more data assert!(itr.next().is_none()); } @@ -1079,7 +1089,9 @@ mod test { let delete_predicates = vec![Predicate::new(vec![BinaryExpr::from(( "region", "=", "west", ))])]; - let mut itr = chunk.read_filter(predicate, Selection::All, delete_predicates); + let mut itr = chunk + .read_filter(predicate, Selection::All, delete_predicates) + .unwrap(); let exp_env_values = Values::Dictionary(vec![0], vec![Some("us-west")]); let exp_region_values = Values::Dictionary(vec![0], vec![Some("east")]); @@ -1127,6 +1139,16 @@ mod test { // No more data assert!(itr.next().is_none()); + + // Error when one of the negated predicates is invalid + let predicate = Predicate::new(vec![BinaryExpr::from(("env", "=", "us-west"))]); + let delete_predicates = vec![ + Predicate::new(vec![BinaryExpr::from(("region", "=", "west"))]), + Predicate::new(vec![BinaryExpr::from(("time", "=", "not a number"))]), + ]; + assert!(chunk + .read_filter(predicate, Selection::All, delete_predicates) + .is_err()); } #[test] diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index 8ee9191d6f..aec46636cf 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -3,10 +3,10 @@ use crate::{ row_group::{self, ColumnName, Literal, Predicate, RowGroup}, schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema}, value::{OwnedValue, Scalar, Value}, + BinaryExpr, }; use arrow::record_batch::RecordBatch; use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary}; -use datafusion::physical_plan::expressions::BinaryExpr; use internal_types::selection::Selection; use parking_lot::RwLock; use snafu::{ensure, Snafu}; @@ -247,12 +247,14 @@ impl Table { // Identify set of row groups that might satisfy the predicate. // // Produce a set of these row groups along with a snapshot of the table meta - // data associated with them. + // data associated with them. Returns an error if the provided predicate + // cannot be applied to the row groups with respect to the schema. // // N.B the table read lock is only held as long as it takes to determine // with meta data whether each row group may satisfy the predicate. fn filter_row_groups(&self, predicate: &Predicate) -> (Arc, Vec>) { let table_data = self.table_data.read(); + let mut row_groups = Vec::with_capacity(table_data.data.len()); 'rowgroup: for rg in table_data.data.iter() { @@ -282,10 +284,15 @@ impl Table { columns: &Selection<'_>, predicate: &Predicate, negated_predicates: &[Predicate], - ) -> ReadFilterResults { + ) -> Result { // identify row groups where time range and predicates match could match - // the predicate. Get a snapshot of those and the meta-data. + // the predicate. Get a snapshot of those and the meta-data. Finally, + // validate that the predicate can be applied to the row groups. let (meta, row_groups) = self.filter_row_groups(predicate); + meta.validate_exprs(predicate.iter())?; + for pred in negated_predicates { + meta.validate_exprs(pred.iter())?; + } let schema = ResultSchema { select_columns: match columns { @@ -296,12 +303,12 @@ impl Table { }; // TODO(edd): I think I can remove `predicates` from the results - ReadFilterResults { + Ok(ReadFilterResults { predicate: predicate.clone(), negated_predicates: negated_predicates.to_vec(), schema, row_groups, - } + }) } /// Returns an iterable collection of data in group columns and aggregate @@ -1461,11 +1468,13 @@ mod test { // Get all the results let predicate = Predicate::with_time_range(&[], 1, 31); - let results = table.read_filter( - &Selection::Some(&["time", "count", "region"]), - &predicate, - &[], - ); + let results = table + .read_filter( + &Selection::Some(&["time", "count", "region"]), + &predicate, + &[], + ) + .unwrap(); // check the column types let exp_schema = ResultSchema { @@ -1511,7 +1520,9 @@ mod test { Predicate::with_time_range(&[BinaryExpr::from(("region", "!=", "south"))], 1, 25); // Apply a predicate `WHERE "region" != "south"` - let results = table.read_filter(&Selection::Some(&["time", "region"]), &predicate, &[]); + let results = table + .read_filter(&Selection::Some(&["time", "region"]), &predicate, &[]) + .unwrap(); let exp_schema = ResultSchema { select_columns: vec![ diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index df51d378db..c1cdf3ee99 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -374,7 +374,11 @@ impl QueryChunk for DbChunk { "Negated Predicate pushed down to RUB" ); - let read_results = chunk.read_filter(rb_predicate, selection, negated_delete_exprs); + let read_results = chunk + .read_filter(rb_predicate, selection, negated_delete_exprs) + .context(ReadBufferChunkError { + chunk_id: self.id(), + })?; let schema = chunk .read_filter_table_schema(selection)