From 364d245eae28fad16f074d64b4544a5610719a6e Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 17 Sep 2021 16:20:42 -0400 Subject: [PATCH] feat: apply negated delete predicates during scan --- predicate/src/predicate.rs | 87 ++++++++++++++++-------- predicate/src/serialize.rs | 13 ++-- query/src/provider.rs | 56 +++++++++++++-- query/src/util.rs | 32 ++++++++- query_tests/src/scenarios.rs | 20 +++--- query_tests/src/sql.rs | 2 + server/src/db/lifecycle/move_chunk.rs | 2 + tests/end_to_end_cases/influxdb_ioxd.rs | 1 + tests/end_to_end_cases/management_api.rs | 8 +-- 9 files changed, 159 insertions(+), 62 deletions(-) diff --git a/predicate/src/predicate.rs b/predicate/src/predicate.rs index 0a54fd293c..39652c0605 100644 --- a/predicate/src/predicate.rs +++ b/predicate/src/predicate.rs @@ -13,6 +13,7 @@ use datafusion::{ error::DataFusionError, logical_plan::{col, lit, Column, Expr, Operator}, optimizer::utils, + scalar::ScalarValue, }; use datafusion_util::{make_range_expr, AndExprBuilder}; use internal_types::schema::TIME_COLUMN_NAME; @@ -162,41 +163,47 @@ impl Predicate { self == &EMPTY_PREDICATE } - /// Merge the given delete predicates into this select predicate - pub fn merge_delete_predicates(&mut self, delete_predicates: &[S]) + /// Return a negated DF logical expression for the given delete predicates + pub fn negated_expr(delete_predicates: &[S]) -> Option where S: AsRef, { - self.add_delete_ranges(delete_predicates); - self.add_delete_exprs(delete_predicates); - } + if delete_predicates.is_empty() { + return None; + } - /// Add each range [start, stop] of the delete_predicates into the predicate in - /// the form "time < start OR time > stop" to eliminate that range from the query - fn add_delete_ranges(&mut self, delete_predicates: &[S]) - where - S: AsRef, - { - for pred in delete_predicates { - let pred = pred.as_ref(); + let mut pred = PredicateBuilder::default().build(); + pred.merge_delete_predicates(delete_predicates); - if let Some(range) = pred.range { - let expr = col(TIME_COLUMN_NAME) - .lt(lit(range.start)) - .or(col(TIME_COLUMN_NAME).gt(lit(range.end))); - self.exprs.push(expr); + // Make a conjunctive expression of the pred.exprs + let mut val = None; + for e in pred.exprs { + match val { + None => val = Some(e), + Some(expr) => val = Some(expr.and(e)), } } + + val + } + + /// Merge the given delete predicates into this select predicate + pub fn merge_delete_predicates(&mut self, delete_predicates: &[S]) + where + S: AsRef, + { + self.add_negated_delete_exprs(delete_predicates); } /// Add a list of disjunctive negated expressions. - /// Example: there are two deletes as follows - /// . Delete_1: WHERE city != "Boston" AND temp = 70 - /// . Delete 2: WHERE state = "NY" AND route != "I90" + /// Example: there are two deletes as follows (note that time_range is stored separated in the Predicate + /// but we need to put it together with the exprs hee) + /// . Delete_1: WHERE city != "Boston" AND temp = 70 AND time_range in [10, 30) + /// . Delete 2: WHERE state = "NY" AND route != "I90" AND time_range in [20, 50) /// The negated list will be "NOT(Delete_1)", NOT(Delete_2)" which means - /// NOT(city != "Boston" AND temp = 70), NOT(state = "NY" AND route != "I90") which means - /// [NOT(city = Boston") OR NOT(temp = 70)], [NOT(state = "NY") OR NOT(route != "I90")] - fn add_delete_exprs(&mut self, delete_predicates: &[S]) + /// NOT(city != "Boston" AND temp = 70 AND time_range in [10, 30)), NOT(state = "NY" AND route != "I90" AND time_range in [20, 50)) which means + /// [NOT(city = Boston") OR NOT(temp = 70) OR NOT(time_range in [10, 30))], [NOT(state = "NY") OR NOT(route != "I90") OR NOT(time_range in [20, 50))] + fn add_negated_delete_exprs(&mut self, delete_predicates: &[S]) where S: AsRef, { @@ -204,6 +211,26 @@ impl Predicate { let pred = pred.as_ref(); let mut expr: Option = None; + + // Time range + if let Some(range) = pred.range { + // cast int to timestamp + // NGA todo: add in DF a function timestamp_lit(i64_val) which does lit(ScalarValue::TimestampNanosecond(Some(i64_val)) + // and use it here + let ts_start = ScalarValue::TimestampNanosecond(Some(range.start)); + let ts_end = ScalarValue::TimestampNanosecond(Some(range.end)); + // time_expr = NOT(start <= time_range < end) + let time_expr = col(TIME_COLUMN_NAME) + .lt(lit(ts_start)) + .or(col(TIME_COLUMN_NAME).gt_eq(lit(ts_end))); + + match expr { + None => expr = Some(time_expr), + Some(e) => expr = Some(e.or(time_expr)), + } + } + + // Exprs for exp in &pred.exprs { match expr { None => expr = Some(exp.clone().not()), @@ -610,7 +637,7 @@ impl ParseDeletePredicate { value, quote_style: _, // all quotes are ignored as done in idpe }) => Expr::Column(Column { - relation: Some(table_name.to_string()), + relation: None, name: value.to_string(), }), _ => return false, // not a column name @@ -948,14 +975,14 @@ mod tests { println!("{:#?}", result); let mut expected = vec![]; - let e = col("test.city").eq(lit("Boston")); + let e = col("city").eq(lit("Boston")); expected.push(e); let val: i64 = 100; - let e = col("test.cost").not_eq(lit(val)); + let e = col("cost").not_eq(lit(val)); expected.push(e); - let e = col("test.state").not_eq(lit("MA")); + let e = col("state").not_eq(lit("MA")); expected.push(e); - let e = col("test.temp").eq(lit(87.5)); + let e = col("temp").eq(lit(87.5)); expected.push(e); assert_eq!(result, expected) @@ -1003,7 +1030,7 @@ mod tests { let mut expected = vec![]; let num: i64 = 100; - let e = col("test.cost").not_eq(lit(num)); + let e = col("cost").not_eq(lit(num)); expected.push(e); assert_eq!(result.predicate, expected); } diff --git a/predicate/src/serialize.rs b/predicate/src/serialize.rs index 2d34817de2..f8a8a5e235 100644 --- a/predicate/src/serialize.rs +++ b/predicate/src/serialize.rs @@ -146,10 +146,7 @@ pub enum DeserializeError { } /// Deserialize IOx [`Predicate`] from a protobuf object. -pub fn deserialize( - proto_predicate: &proto::Predicate, - table_name: &str, -) -> Result { +pub fn deserialize(proto_predicate: &proto::Predicate) -> Result { let predicate = Predicate { table_names: deserialize_optional_string_set(&proto_predicate.table_names), field_columns: deserialize_optional_string_set(&proto_predicate.field_columns), @@ -158,7 +155,7 @@ pub fn deserialize( exprs: proto_predicate .exprs .iter() - .map(|expr| deserialize_expr(expr, table_name)) + .map(|expr| deserialize_expr(expr)) .collect::, DeserializeError>>()?, }; Ok(predicate) @@ -181,9 +178,9 @@ fn deserialize_timestamp_range(r: &Option) -> Option