From d9d741969385f6362feceec543d55c7734763c5a Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Wed, 31 May 2023 12:28:03 +1000 Subject: [PATCH] refactor: move time range logic to separate module The `rewrite_expression` module was getting large, so made sense to move time range logic to its own module. --- iox_query_influxql/src/plan/planner.rs | 2 + .../src/plan/planner/rewrite_expression.rs | 936 +----------------- .../src/plan/planner/test_utils.rs | 38 + .../src/plan/planner/time_range.rs | 909 +++++++++++++++++ 4 files changed, 955 insertions(+), 930 deletions(-) create mode 100644 iox_query_influxql/src/plan/planner/test_utils.rs create mode 100644 iox_query_influxql/src/plan/planner/time_range.rs diff --git a/iox_query_influxql/src/plan/planner.rs b/iox_query_influxql/src/plan/planner.rs index 301c95ddb7..0af1ffe9a7 100644 --- a/iox_query_influxql/src/plan/planner.rs +++ b/iox_query_influxql/src/plan/planner.rs @@ -1,5 +1,7 @@ mod rewrite_expression; mod select; +mod test_utils; +mod time_range; use crate::plan::error; use crate::plan::influxql_time_range_expression::{ diff --git a/iox_query_influxql/src/plan/planner/rewrite_expression.rs b/iox_query_influxql/src/plan/planner/rewrite_expression.rs index f90489a414..5353b3362d 100644 --- a/iox_query_influxql/src/plan/planner/rewrite_expression.rs +++ b/iox_query_influxql/src/plan/planner/rewrite_expression.rs @@ -121,25 +121,19 @@ //! [`Reduce`]: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4850-L4852 //! [`EvalBool`]: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4181-L4183 //! [`Eval`]: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4137 -use std::cmp::Ordering; -use std::ops::Bound; use std::sync::Arc; -use crate::plan::error; +use crate::plan::planner::time_range; use crate::plan::util::Schemas; use arrow::datatypes::DataType; -use datafusion::common::tree_node::{ - Transformed, TreeNode, TreeNodeRewriter, TreeNodeVisitor, VisitRecursion, -}; -use datafusion::common::{Column, Result, ScalarValue}; +use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; +use datafusion::common::{Result, ScalarValue}; use datafusion::logical_expr::{ binary_expr, cast, coalesce, lit, BinaryExpr, Expr, ExprSchemable, Operator, }; use datafusion::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext}; -use datafusion::optimizer::utils::disjunction; use datafusion::physical_expr::execution_props::ExecutionProps; use datafusion::prelude::when; -use datafusion_util::AsExpr; use observability_deps::tracing::trace; use predicate::rpc_predicate::{iox_expr_rewrite, simplify_predicate}; @@ -163,7 +157,7 @@ pub(super) fn rewrite_conditional_expr( .and_then(|expr| expr.rewrite(&mut FixRegularExpressions { schemas })) .map(|expr| log_rewrite(expr, "after fix_regular_expressions")) // rewrite time predicates to behave like InfluxQL OG - .and_then(|expr| rewrite_time_range_exprs(expr, &simplifier)) + .and_then(|expr| time_range::rewrite_time_range_exprs(expr, &simplifier)) .map(|expr| log_rewrite(expr, "after rewrite_time_range_exprs")) // rewrite exprs with incompatible operands to NULL or FALSE // (seems like FixRegularExpressions could be combined into this pass) @@ -220,620 +214,6 @@ pub(in crate::plan) fn rewrite_field_expr(expr: Expr, schemas: &Schemas) -> Resu rewrite_expr(expr, schemas) } -/// Traverse `expr` and promote time range expressions to the root -/// binary node on the left hand side and combined using the conjunction (`AND`) -/// operator to ensure the time range is applied to the entire predicate. -/// -/// Additionally, multiple `time = ` expressions are combined -/// using the disjunction (OR) operator, whereas, InfluxQL only supports -/// a single `time = ` expression. -/// -/// # NOTE -/// -/// Combining relational operators like `time > now() - 5s` and equality -/// operators like `time = ` with a disjunction (`OR`) -/// will evaluate to false, like InfluxQL. -/// -/// # Background -/// -/// The InfluxQL query engine always promotes the time range expression to filter -/// all results. It is misleading that time ranges are written in the `WHERE` clause, -/// as the `WHERE` predicate is not evaluated in its entirety for each row. Rather, -/// InfluxQL extracts the time range to form a time bound for the entire query and -/// removes any time range expressions from the filter predicate. The time range -/// is determined using the `>` and `≥` operators to form the lower bound and -/// the `<` and `≤` operators to form the upper bound. When multiple instances of -/// the lower or upper bound operators are found, the time bounds will form the -/// intersection. For example -/// -/// ```sql -/// WHERE time >= 1000 AND time >= 2000 AND time < 10000 and time < 9000 -/// ``` -/// -/// is equivalent to -/// -/// ```sql -/// WHERE time >= 2000 AND time < 9000 -/// ``` -/// -/// Further, InfluxQL only allows a single `time = ` binary expression. Multiple -/// occurrences result in an empty result set. -/// -/// ## Examples -/// -/// Lets illustrate how InfluxQL applies predicates with a typical example, using the -/// `metrics.lp` data in the IOx repository: -/// -/// ```sql -/// SELECT cpu, usage_idle FROM cpu -/// WHERE -/// time > '2020-06-11T16:53:30Z' AND time < '2020-06-11T16:55:00Z' AND cpu = 'cpu0' -/// ``` -/// -/// InfluxQL first filters rows based on the time range: -/// -/// ```sql -/// '2020-06-11T16:53:30Z' < time < '2020-06-11T16:55:00Z' -/// ``` -/// -/// and then applies the predicate to the individual rows: -/// -/// ```sql -/// cpu = 'cpu0' -/// ``` -/// -/// Producing the following result: -/// -/// ```text -/// name: cpu -/// time cpu usage_idle -/// ---- --- ---------- -/// 2020-06-11T16:53:40Z cpu0 90.29029029029029 -/// 2020-06-11T16:53:50Z cpu0 89.8 -/// 2020-06-11T16:54:00Z cpu0 90.09009009009009 -/// 2020-06-11T16:54:10Z cpu0 88.82235528942115 -/// ``` -/// -/// The following example is a little more complicated, but shows again how InfluxQL -/// separates the time ranges from the predicate: -/// -/// ```sql -/// SELECT cpu, usage_idle FROM cpu -/// WHERE -/// time > '2020-06-11T16:53:30Z' AND time < '2020-06-11T16:55:00Z' AND cpu = 'cpu0' OR cpu = 'cpu1' -/// ``` -/// -/// InfluxQL first filters rows based on the time range: -/// -/// ```sql -/// '2020-06-11T16:53:30Z' < time < '2020-06-11T16:55:00Z' -/// ``` -/// -/// and then applies the predicate to the individual rows: -/// -/// ```sql -/// cpu = 'cpu0' OR cpu = 'cpu1' -/// ``` -/// -/// This is certainly quite different to SQL, which would evaluate the predicate as: -/// -/// ```sql -/// SELECT cpu, usage_idle FROM cpu -/// WHERE -/// (time > '2020-06-11T16:53:30Z' AND time < '2020-06-11T16:55:00Z' AND cpu = 'cpu0') OR cpu = 'cpu1' -/// ``` -/// -/// ## Time ranges are not normal -/// -/// Here we demonstrate how the operators combining time ranges do not matter. Using the -/// original query: -/// -/// ```sql -/// SELECT cpu, usage_idle FROM cpu -/// WHERE -/// time > '2020-06-11T16:53:30Z' AND time < '2020-06-11T16:55:00Z' AND cpu = 'cpu0' -/// ``` -/// -/// we replace all `AND` operators with `OR`: -/// -/// ```sql -/// SELECT cpu, usage_idle FROM cpu -/// WHERE -/// time > '2020-06-11T16:53:30Z' OR time < '2020-06-11T16:55:00Z' OR cpu = 'cpu0' -/// ``` -/// -/// This should return all rows, but yet it returns the same result 🤯: -/// -/// ```text -/// name: cpu -/// time cpu usage_idle -/// ---- --- ---------- -/// 2020-06-11T16:53:40Z cpu0 90.29029029029029 -/// 2020-06-11T16:53:50Z cpu0 89.8 -/// 2020-06-11T16:54:00Z cpu0 90.09009009009009 -/// 2020-06-11T16:54:10Z cpu0 88.82235528942115 -/// ``` -/// -/// It becomes clearer, if we again review at how InfluxQL OG evaluates the `WHERE` -/// predicate, InfluxQL first filters rows based on the time range, which uses the -/// rules previously defined by finding `>` and `≥` to determine the lower bound -/// and `<` and `≤`: -/// -/// ```sql -/// '2020-06-11T16:53:30Z' < time < '2020-06-11T16:55:00Z' -/// ``` -/// -/// and then applies the predicate to the individual rows: -/// -/// ```sql -/// cpu = 'cpu0' -/// ``` -/// -/// ## How to think of time ranges intuitively -/// -/// Imagine a slight variation of InfluxQL has a separate _time bounds clause_. -/// It could have two forms, first as a `BETWEEN` -/// -/// ```sql -/// SELECT cpu, usage_idle FROM cpu -/// WITH TIME BETWEEN '2020-06-11T16:53:30Z' AND '2020-06-11T16:55:00Z' -/// WHERE -/// cpu = 'cpu0' -/// ``` -/// -/// or as an `IN` to select multiple points: -/// -/// ```sql -/// SELECT cpu, usage_idle FROM cpu -/// WITH TIME IN ('2004-04-09T12:00:00Z', '2004-04-09T12:00:10Z', ...) -/// WHERE -/// cpu = 'cpu0' -/// ``` -fn rewrite_time_range_exprs( - expr: Expr, - simplifier: &ExprSimplifier>, -) -> Result { - let has_time_range = matches!( - expr.apply(&mut |expr| Ok(match expr { - expr @ Expr::BinaryExpr(_) if is_time_range(expr) => { - VisitRecursion::Stop - } - _ => VisitRecursion::Continue, - })) - .expect("infallible"), - VisitRecursion::Stop - ); - - // Nothing to do if there are no time range expressions - if !has_time_range { - return Ok(expr); - } - - let mut rw = SeparateTimeRange::new(simplifier); - expr.visit(&mut rw)?; - - // When `expr` contains both time expressions using relational - // operators like > or <= and equality, such as - // - // WHERE time > now() - 5s OR time = '2004-04-09:12:00:00Z' AND cpu = 'cpu0' - // - // the entire expression evaluates to `false` to be consistent with InfluxQL. - if !rw.time_range.is_unbounded() && !rw.equality.is_empty() { - return Ok(lit(false)); - } - - let lhs = if let Some(expr) = rw.time_range.as_expr() { - Some(expr) - } else if !rw.equality.is_empty() { - disjunction(rw.equality) - } else { - None - }; - - let rhs = rw - .stack - .pop() - .ok_or_else(|| error::map::internal("expected expression on stack"))?; - - Ok(match (lhs, rhs) { - (Some(lhs), Some(rhs)) => binary_expr(lhs, Operator::And, rhs), - (Some(expr), None) | (None, Some(expr)) => expr, - (None, None) => lit(true), - }) -} - -/// Returns `true` if `expr` refers to the `time` column. -fn is_time_column(expr: &Expr) -> bool { - matches!(expr, Expr::Column(Column{ name, .. }) if name == "time") -} - -/// Returns `true` if `expr` is a time range expression -/// -/// Examples include: -/// -/// ```text -/// time = '2004-04-09T12:00:00Z' -/// ``` -/// -/// or -/// -/// ```text -/// time > now() - 5m -/// ``` -fn is_time_range(expr: &Expr) -> bool { - use Operator::*; - match expr { - Expr::BinaryExpr(BinaryExpr { - left, - op: Eq | NotEq | Gt | Lt | GtEq | LtEq, - right, - }) => is_time_column(left) || is_time_column(right), - _ => false, - } -} - -/// Represents the lower bound, in nanoseconds, of a [`TimeRange`]. -struct LowerBound(Bound); - -impl LowerBound { - /// Create a new, time bound that is unbounded - fn unbounded() -> Self { - Self(Bound::Unbounded) - } - - /// Create a new, time bound that includes `v` - fn included(v: i64) -> Self { - Self(Bound::Included(v)) - } - - /// Create a new, time bound that excludes `v` - fn excluded(v: i64) -> Self { - Self(Bound::Excluded(v)) - } - - /// Returns `true` if the receiver is unbounded. - fn is_unbounded(&self) -> bool { - matches!(self.0, Bound::Unbounded) - } - - fn as_expr(&self) -> Option { - match self.0 { - Bound::Included(ts) => Some( - "time" - .as_expr() - .gt_eq(lit(ScalarValue::TimestampNanosecond(Some(ts), None))), - ), - Bound::Excluded(ts) => Some( - "time" - .as_expr() - .gt(lit(ScalarValue::TimestampNanosecond(Some(ts), None))), - ), - Bound::Unbounded => None, - } - } -} - -impl Default for LowerBound { - fn default() -> Self { - Self::unbounded() - } -} - -impl Eq for LowerBound {} - -impl PartialEq for LowerBound { - fn eq(&self, other: &Self) -> bool { - self.cmp(other) == Ordering::Equal - } -} - -impl PartialOrd for LowerBound { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for LowerBound { - fn cmp(&self, other: &Self) -> Ordering { - match (self.0, other.0) { - (Bound::Unbounded, Bound::Unbounded) => Ordering::Equal, - (Bound::Unbounded, _) => Ordering::Less, - (_, Bound::Unbounded) => Ordering::Greater, - (Bound::Included(a), Bound::Included(b)) | (Bound::Excluded(a), Bound::Excluded(b)) => { - a.cmp(&b) - } - (Bound::Included(a), Bound::Excluded(b)) => match a.cmp(&b) { - Ordering::Equal => Ordering::Less, - // We know that if a > b, b + 1 is safe from overflow - Ordering::Greater if a == b + 1 => Ordering::Equal, - ordering => ordering, - }, - (Bound::Excluded(a), Bound::Included(b)) => match a.cmp(&b) { - Ordering::Equal => Ordering::Greater, - // We know that if a < b, a + 1 is safe from overflow - Ordering::Less if a + 1 == b => Ordering::Equal, - ordering => ordering, - }, - } - } -} - -/// Represents the upper bound, in nanoseconds, of a [`TimeRange`]. -struct UpperBound(Bound); - -impl UpperBound { - /// Create a new, unbounded upper bound. - fn unbounded() -> Self { - Self(Bound::Unbounded) - } - - /// Create a new, upper bound that includes `v` - fn included(v: i64) -> Self { - Self(Bound::Included(v)) - } - - /// Create a new, upper bound that excludes `v` - fn excluded(v: i64) -> Self { - Self(Bound::Excluded(v)) - } - - /// Returns `true` if the receiver is unbounded. - fn is_unbounded(&self) -> bool { - matches!(self.0, Bound::Unbounded) - } - - fn as_expr(&self) -> Option { - match self.0 { - Bound::Included(ts) => Some( - "time" - .as_expr() - .lt_eq(lit(ScalarValue::TimestampNanosecond(Some(ts), None))), - ), - Bound::Excluded(ts) => Some( - "time" - .as_expr() - .lt(lit(ScalarValue::TimestampNanosecond(Some(ts), None))), - ), - Bound::Unbounded => None, - } - } -} - -impl Default for UpperBound { - fn default() -> Self { - Self::unbounded() - } -} - -impl Eq for UpperBound {} - -impl PartialEq for UpperBound { - fn eq(&self, other: &Self) -> bool { - self.cmp(other) == Ordering::Equal - } -} - -impl PartialOrd for UpperBound { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for UpperBound { - fn cmp(&self, other: &Self) -> Ordering { - match (self.0, other.0) { - (Bound::Unbounded, Bound::Unbounded) => Ordering::Equal, - (Bound::Unbounded, _) => Ordering::Greater, - (_, Bound::Unbounded) => Ordering::Less, - (Bound::Included(a), Bound::Included(b)) | (Bound::Excluded(a), Bound::Excluded(b)) => { - a.cmp(&b) - } - (Bound::Included(a), Bound::Excluded(b)) => match a.cmp(&b) { - Ordering::Equal => Ordering::Greater, - // We know that if a < b, b - 1 is safe from underflow - Ordering::Less if a == b - 1 => Ordering::Equal, - ordering => ordering, - }, - (Bound::Excluded(a), Bound::Included(b)) => match a.cmp(&b) { - Ordering::Equal => Ordering::Less, - // We know that if a > b, a - 1 is safe from overflow - Ordering::Greater if a - 1 == b => Ordering::Equal, - ordering => ordering, - }, - } - } -} - -/// Represents a time range, with a single lower and upper bound. -#[derive(Default, PartialEq, Eq)] -struct TimeRange { - lower: LowerBound, - upper: UpperBound, -} - -impl TimeRange { - /// Returns `true` if the time range is unbounded. - fn is_unbounded(&self) -> bool { - self.lower.is_unbounded() && self.upper.is_unbounded() - } - - /// Returns the time range as a conditional expression. - fn as_expr(&self) -> Option { - match (self.lower.as_expr(), self.upper.as_expr()) { - (None, None) => None, - (Some(e), None) | (None, Some(e)) => Some(e), - (Some(lower), Some(upper)) => Some(lower.and(upper)), - } - } -} - -struct SeparateTimeRange<'a> { - simplifier: &'a ExprSimplifier>, - stack: Vec>, - time_range: TimeRange, - - /// Equality time range expressions, such as: - /// - /// ```sql - /// time = '2004-04-09T12:00:00Z' - equality: Vec, -} - -impl<'a> SeparateTimeRange<'a> { - fn new(simplifier: &'a ExprSimplifier>) -> Self { - Self { - simplifier, - stack: vec![], - time_range: Default::default(), - equality: vec![], - } - } -} - -impl<'a> TreeNodeVisitor for SeparateTimeRange<'a> { - type N = Expr; - - fn pre_visit(&mut self, node: &Self::N) -> Result { - if matches!(node, Expr::BinaryExpr(_)) { - Ok(VisitRecursion::Continue) - } else { - Ok(VisitRecursion::Skip) - } - } - - fn post_visit(&mut self, node: &Self::N) -> Result { - use Operator::*; - - match node { - // A binary expression where either the left or right - // expression refers to the "time" column. - // - // "time" OP expression - // expression OP "time" - // - Expr::BinaryExpr(BinaryExpr { - left, - op: op @ (Eq | NotEq | Gt | Lt | GtEq | LtEq), - right, - }) if is_time_column(left) | is_time_column(right) => { - self.stack.push(None); - - if matches!(op, Eq | NotEq) { - let node = self.simplifier.simplify(node.clone())?; - self.equality.push(node); - return Ok(VisitRecursion::Continue); - } - - /// Op is the limited set of operators expected from here on, - /// to avoid repeated wildcard match arms with unreachable!(). - enum Op { - Gt, - GtEq, - Lt, - LtEq, - } - - // Map the DataFusion Operator to Op - let op = match op { - Gt => Op::Gt, - GtEq => Op::GtEq, - Lt => Op::Lt, - LtEq => Op::LtEq, - _ => unreachable!("expected: Gt | Lt | GtEq | LtEq"), - }; - - let (expr, op) = if is_time_column(left) { - (right, op) - } else { - // swap the operators when the conditional is `expression OP "time"` - ( - left, - match op { - Op::Gt => Op::Lt, - Op::GtEq => Op::LtEq, - Op::Lt => Op::Gt, - Op::LtEq => Op::GtEq, - }, - ) - }; - - // resolve `now()` and reduce binary expressions to a single constant - let expr = self.simplifier.simplify(*expr.clone())?; - - let ts = match expr { - Expr::Literal(ScalarValue::TimestampNanosecond(Some(ts), _)) => ts, - expr => { - return error::internal(format!( - "expected TimestampNanosecond, got: {}", - expr - )) - } - }; - - match op { - Op::Gt => { - let ts = LowerBound::excluded(ts); - if ts > self.time_range.lower { - self.time_range.lower = ts; - } - } - Op::GtEq => { - let ts = LowerBound::included(ts); - if ts > self.time_range.lower { - self.time_range.lower = ts; - } - } - Op::Lt => { - let ts = UpperBound::excluded(ts); - if ts < self.time_range.upper { - self.time_range.upper = ts; - } - } - Op::LtEq => { - let ts = UpperBound::included(ts); - if ts < self.time_range.upper { - self.time_range.upper = ts; - } - } - } - - Ok(VisitRecursion::Continue) - } - - node @ Expr::BinaryExpr(BinaryExpr { - op: - Eq | NotEq | Gt | GtEq | Lt | LtEq | RegexMatch | RegexNotMatch | RegexIMatch - | RegexNotIMatch, - .. - }) => { - self.stack.push(Some(node.clone())); - Ok(VisitRecursion::Continue) - } - Expr::BinaryExpr(BinaryExpr { - op: op @ (And | Or), - .. - }) => { - let right = self - .stack - .pop() - .ok_or_else(|| error::map::internal("invalid expr stack"))?; - let left = self - .stack - .pop() - .ok_or_else(|| error::map::internal("invalid expr stack"))?; - self.stack.push(match (left, right) { - (Some(left), Some(right)) => Some(binary_expr(left, *op, right)), - (None, Some(node)) | (Some(node), None) => Some(node), - (None, None) => None, - }); - - Ok(VisitRecursion::Continue) - } - _ => Ok(VisitRecursion::Continue), - } - } -} - /// The expression was rewritten fn yes(expr: Expr) -> Result> { Ok(Transformed::Yes(expr)) @@ -1156,188 +536,10 @@ fn rewrite_tag_columns( #[cfg(test)] mod test { use super::*; - use chrono::{DateTime, NaiveDate, Utc}; - use datafusion::common::{DFSchemaRef, ToDFSchema}; - use datafusion::logical_expr::{lit_timestamp_nano, now}; + use crate::plan::planner::test_utils::{execution_props, new_schemas}; + use datafusion::logical_expr::lit_timestamp_nano; use datafusion::prelude::col; use datafusion_util::AsExpr; - use schema::{InfluxFieldType, SchemaBuilder}; - use std::sync::Arc; - - fn new_schemas() -> (Schemas, DataSourceSchema<'static>) { - let iox_schema = SchemaBuilder::new() - .measurement("m0") - .timestamp() - .tag("tag0") - .tag("tag1") - .influx_field("float_field", InfluxFieldType::Float) - .influx_field("integer_field", InfluxFieldType::Integer) - .influx_field("unsigned_field", InfluxFieldType::UInteger) - .influx_field("string_field", InfluxFieldType::String) - .influx_field("boolean_field", InfluxFieldType::Boolean) - .build() - .expect("schema failed"); - let df_schema: DFSchemaRef = Arc::clone(iox_schema.inner()).to_dfschema_ref().unwrap(); - (Schemas { df_schema }, DataSourceSchema::Table(iox_schema)) - } - - #[test] - fn test_rewrite_time_range_exprs() { - use datafusion::common::ScalarValue as V; - - test_helpers::maybe_start_logging(); - - let props = execution_props(); - let (schemas, _) = new_schemas(); - let simplify_context = - SimplifyContext::new(&props).with_schema(Arc::clone(&schemas.df_schema)); - let simplifier = ExprSimplifier::new(simplify_context); - - let rewrite = |expr| { - rewrite_time_range_exprs(expr, &simplifier) - .unwrap() - .to_string() - }; - - let expr = "time" - .as_expr() - .gt_eq(now() - lit(V::new_interval_dt(0, 1000))); - assert_eq!( - rewrite(expr), - r#"time >= TimestampNanosecond(1672531199000000000, None)"# - ); - - // reduces the lower bound to a single expression - let expr = "time" - .as_expr() - .gt_eq(now() - lit(V::new_interval_dt(0, 1000))) - .and( - "time" - .as_expr() - .gt_eq(now() - lit(V::new_interval_dt(0, 500))), - ); - assert_eq!( - rewrite(expr), - r#"time >= TimestampNanosecond(1672531199500000000, None)"# - ); - - let expr = "time" - .as_expr() - .lt_eq(now() - lit(V::new_interval_dt(0, 1000))); - assert_eq!( - rewrite(expr), - r#"time <= TimestampNanosecond(1672531199000000000, None)"# - ); - - // reduces the upper bound to a single expression - let expr = "time" - .as_expr() - .lt_eq(now() + lit(V::new_interval_dt(0, 1000))) - .and( - "time" - .as_expr() - .lt_eq(now() + lit(V::new_interval_dt(0, 500))), - ); - assert_eq!( - rewrite(expr), - r#"time <= TimestampNanosecond(1672531200500000000, None)"# - ); - - let expr = "time" - .as_expr() - .gt_eq(now() - lit(V::new_interval_dt(0, 1000))) - .and("time".as_expr().lt(now())); - assert_eq!( - rewrite(expr), - r#"time >= TimestampNanosecond(1672531199000000000, None) AND time < TimestampNanosecond(1672531200000000000, None)"# - ); - - let expr = "time" - .as_expr() - .gt_eq(now() - lit(V::new_interval_dt(0, 1000))) - .and("cpu".as_expr().eq(lit("cpu0"))); - assert_eq!( - rewrite(expr), - r#"time >= TimestampNanosecond(1672531199000000000, None) AND cpu = Utf8("cpu0")"# - ); - - let expr = "time".as_expr().eq(lit_timestamp_nano(0)); - assert_eq!(rewrite(expr), r#"time = TimestampNanosecond(0, None)"#); - - let expr = "instance" - .as_expr() - .eq(lit("instance-01")) - .or("instance".as_expr().eq(lit("instance-02"))) - .and( - "time" - .as_expr() - .gt_eq(now() - lit(V::new_interval_dt(0, 60_000))), - ); - assert_eq!( - rewrite(expr), - r#"time >= TimestampNanosecond(1672531140000000000, None) AND (instance = Utf8("instance-01") OR instance = Utf8("instance-02"))"# - ); - - let expr = "time" - .as_expr() - .gt_eq(now() - lit(V::new_interval_dt(0, 60_000))) - .and("time".as_expr().lt(now())) - .and( - "cpu" - .as_expr() - .eq(lit("cpu0")) - .or("cpu".as_expr().eq(lit("cpu1"))), - ); - assert_eq!( - rewrite(expr), - r#"time >= TimestampNanosecond(1672531140000000000, None) AND time < TimestampNanosecond(1672531200000000000, None) AND (cpu = Utf8("cpu0") OR cpu = Utf8("cpu1"))"# - ); - - // time >= now - 60s AND time < now() OR cpu = 'cpu0' OR cpu = 'cpu1' - // - // Expects the time range to be combined with a conjunction (AND) - let expr = "time" - .as_expr() - .gt_eq(now() - lit(V::new_interval_dt(0, 60_000))) - .and("time".as_expr().lt(now())) - .or("cpu" - .as_expr() - .eq(lit("cpu0")) - .or("cpu".as_expr().eq(lit("cpu1")))); - assert_eq!( - rewrite(expr), - r#"time >= TimestampNanosecond(1672531140000000000, None) AND time < TimestampNanosecond(1672531200000000000, None) AND (cpu = Utf8("cpu0") OR cpu = Utf8("cpu1"))"# - ); - - // time = 0 OR time = 10 AND cpu = 'cpu0' - let expr = "time".as_expr().eq(lit_timestamp_nano(0)).or("time" - .as_expr() - .eq(lit_timestamp_nano(10)) - .and("cpu".as_expr().eq(lit("cpu0")))); - assert_eq!( - rewrite(expr), - r#"(time = TimestampNanosecond(0, None) OR time = TimestampNanosecond(10, None)) AND cpu = Utf8("cpu0")"# - ); - - // no time - let expr = "f64".as_expr().gt_eq(lit(19.5_f64)).or(binary_expr( - "f64".as_expr(), - Operator::RegexMatch, - lit("foo"), - )); - assert_eq!( - rewrite(expr), - "f64 >= Float64(19.5) OR (f64 ~ Utf8(\"foo\"))" - ); - - // fallible - - let expr = "time" - .as_expr() - .eq(lit_timestamp_nano(0)) - .or("time".as_expr().gt(now())); - assert_eq!(rewrite(expr), "Boolean(false)"); - } /// Tests which validate that division is coalesced to `0`, to handle division by zero, /// which normally returns a `NULL`, but presents as `0` for InfluxQL. @@ -1430,17 +632,6 @@ mod test { assert_eq!(rewrite(expr), r#"tag0 !~ Utf8("foo")"#); } - fn execution_props() -> ExecutionProps { - let start_time = NaiveDate::from_ymd_opt(2023, 1, 1) - .unwrap() - .and_hms_opt(0, 0, 0) - .unwrap(); - let start_time = DateTime::::from_utc(start_time, Utc); - let mut props = ExecutionProps::new(); - props.query_execution_start_time = start_time; - props - } - #[test] fn test_string_operations() { let props = execution_props(); @@ -1733,119 +924,4 @@ mod test { let expr = col("string_field").not_eq(lit("foo")); assert_eq!(rewrite(expr), r#"string_field != Utf8("foo")"#); } - - #[test] - fn test_lower_bound_cmp() { - let (a, b) = (LowerBound::unbounded(), LowerBound::unbounded()); - assert!(a == b); - - let (a, b) = (LowerBound::included(5), LowerBound::included(5)); - assert!(a == b); - - let (a, b) = (LowerBound::included(5), LowerBound::included(6)); - assert!(a < b); - - // a >= 6 gt a >= 5 - let (a, b) = (LowerBound::included(6), LowerBound::included(5)); - assert!(a > b); - - let (a, b) = (LowerBound::excluded(5), LowerBound::excluded(5)); - assert!(a == b); - - let (a, b) = (LowerBound::excluded(5), LowerBound::excluded(6)); - assert!(a < b); - - let (a, b) = (LowerBound::excluded(6), LowerBound::excluded(5)); - assert!(a > b); - - let (a, b) = (LowerBound::unbounded(), LowerBound::included(5)); - assert!(a < b); - - let (a, b) = (LowerBound::unbounded(), LowerBound::excluded(5)); - assert!(a < b); - - let (a, b) = (LowerBound::included(5), LowerBound::unbounded()); - assert!(a > b); - - let (a, b) = (LowerBound::excluded(5), LowerBound::unbounded()); - assert!(a > b); - - let (a, b) = (LowerBound::included(5), LowerBound::excluded(5)); - assert!(a < b); - - let (a, b) = (LowerBound::included(5), LowerBound::excluded(4)); - assert!(a == b); - - let (a, b) = (LowerBound::included(5), LowerBound::excluded(6)); - assert!(a < b); - - let (a, b) = (LowerBound::included(6), LowerBound::excluded(5)); - assert!(a == b); - - let (a, b) = (LowerBound::excluded(5), LowerBound::included(5)); - assert!(a > b); - - let (a, b) = (LowerBound::excluded(5), LowerBound::included(6)); - assert!(a == b); - - let (a, b) = (LowerBound::excluded(6), LowerBound::included(5)); - assert!(a > b); - } - - #[test] - fn test_upper_bound_cmp() { - let (a, b) = (UpperBound::unbounded(), UpperBound::unbounded()); - assert!(a == b); - - let (a, b) = (UpperBound::included(5), UpperBound::included(5)); - assert!(a == b); - - let (a, b) = (UpperBound::included(5), UpperBound::included(6)); - assert!(a < b); - - let (a, b) = (UpperBound::included(6), UpperBound::included(5)); - assert!(a > b); - - let (a, b) = (UpperBound::excluded(5), UpperBound::excluded(5)); - assert!(a == b); - - let (a, b) = (UpperBound::excluded(5), UpperBound::excluded(6)); - assert!(a < b); - - let (a, b) = (UpperBound::excluded(6), UpperBound::excluded(5)); - assert!(a > b); - - let (a, b) = (UpperBound::unbounded(), UpperBound::included(5)); - assert!(a > b); - - let (a, b) = (UpperBound::unbounded(), UpperBound::excluded(5)); - assert!(a > b); - - let (a, b) = (UpperBound::included(5), UpperBound::unbounded()); - assert!(a < b); - - let (a, b) = (UpperBound::excluded(5), UpperBound::unbounded()); - assert!(a < b); - - let (a, b) = (UpperBound::included(5), UpperBound::excluded(5)); - assert!(a > b); - - let (a, b) = (UpperBound::included(5), UpperBound::excluded(4)); - assert!(a > b); - - let (a, b) = (UpperBound::included(5), UpperBound::excluded(6)); - assert!(a == b); - - let (a, b) = (UpperBound::included(5), UpperBound::excluded(7)); - assert!(a < b); - - let (a, b) = (UpperBound::excluded(5), UpperBound::included(5)); - assert!(a < b); - - let (a, b) = (UpperBound::excluded(5), UpperBound::included(6)); - assert!(a < b); - - let (a, b) = (UpperBound::excluded(5), UpperBound::included(4)); - assert!(a == b); - } } diff --git a/iox_query_influxql/src/plan/planner/test_utils.rs b/iox_query_influxql/src/plan/planner/test_utils.rs new file mode 100644 index 0000000000..1df2de9d5f --- /dev/null +++ b/iox_query_influxql/src/plan/planner/test_utils.rs @@ -0,0 +1,38 @@ +//! APIs for testing. +#![cfg(test)] + +use crate::plan::ir::DataSourceSchema; +use crate::plan::util::Schemas; +use chrono::{DateTime, NaiveDate, Utc}; +use datafusion::common::{DFSchemaRef, ToDFSchema}; +use datafusion::execution::context::ExecutionProps; +use schema::{InfluxFieldType, SchemaBuilder}; +use std::sync::Arc; + +pub(super) fn new_schemas() -> (Schemas, DataSourceSchema<'static>) { + let iox_schema = SchemaBuilder::new() + .measurement("m0") + .timestamp() + .tag("tag0") + .tag("tag1") + .influx_field("float_field", InfluxFieldType::Float) + .influx_field("integer_field", InfluxFieldType::Integer) + .influx_field("unsigned_field", InfluxFieldType::UInteger) + .influx_field("string_field", InfluxFieldType::String) + .influx_field("boolean_field", InfluxFieldType::Boolean) + .build() + .expect("schema failed"); + let df_schema: DFSchemaRef = Arc::clone(iox_schema.inner()).to_dfschema_ref().unwrap(); + (Schemas { df_schema }, DataSourceSchema::Table(iox_schema)) +} + +pub(super) fn execution_props() -> ExecutionProps { + let start_time = NaiveDate::from_ymd_opt(2023, 1, 1) + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap(); + let start_time = DateTime::::from_utc(start_time, Utc); + let mut props = ExecutionProps::new(); + props.query_execution_start_time = start_time; + props +} diff --git a/iox_query_influxql/src/plan/planner/time_range.rs b/iox_query_influxql/src/plan/planner/time_range.rs new file mode 100644 index 0000000000..193d7d734a --- /dev/null +++ b/iox_query_influxql/src/plan/planner/time_range.rs @@ -0,0 +1,909 @@ +use crate::plan::error; +use datafusion::common; +use datafusion::common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}; +use datafusion::common::{Column, ScalarValue}; +use datafusion::logical_expr::{binary_expr, lit, Expr, Operator}; +use datafusion::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext}; +use datafusion::optimizer::utils::disjunction; +use datafusion_util::AsExpr; +use std::cmp::Ordering; +use std::collections::Bound; + +/// Traverse `expr` and promote time range expressions to the root +/// binary node on the left hand side and combined using the conjunction (`AND`) +/// operator to ensure the time range is applied to the entire predicate. +/// +/// Additionally, multiple `time = ` expressions are combined +/// using the disjunction (OR) operator, whereas, InfluxQL only supports +/// a single `time = ` expression. +/// +/// # NOTE +/// +/// Combining relational operators like `time > now() - 5s` and equality +/// operators like `time = ` with a disjunction (`OR`) +/// will evaluate to false, like InfluxQL. +/// +/// # Background +/// +/// The InfluxQL query engine always promotes the time range expression to filter +/// all results. It is misleading that time ranges are written in the `WHERE` clause, +/// as the `WHERE` predicate is not evaluated in its entirety for each row. Rather, +/// InfluxQL extracts the time range to form a time bound for the entire query and +/// removes any time range expressions from the filter predicate. The time range +/// is determined using the `>` and `≥` operators to form the lower bound and +/// the `<` and `≤` operators to form the upper bound. When multiple instances of +/// the lower or upper bound operators are found, the time bounds will form the +/// intersection. For example +/// +/// ```sql +/// WHERE time >= 1000 AND time >= 2000 AND time < 10000 and time < 9000 +/// ``` +/// +/// is equivalent to +/// +/// ```sql +/// WHERE time >= 2000 AND time < 9000 +/// ``` +/// +/// Further, InfluxQL only allows a single `time = ` binary expression. Multiple +/// occurrences result in an empty result set. +/// +/// ## Examples +/// +/// Lets illustrate how InfluxQL applies predicates with a typical example, using the +/// `metrics.lp` data in the IOx repository: +/// +/// ```sql +/// SELECT cpu, usage_idle FROM cpu +/// WHERE +/// time > '2020-06-11T16:53:30Z' AND time < '2020-06-11T16:55:00Z' AND cpu = 'cpu0' +/// ``` +/// +/// InfluxQL first filters rows based on the time range: +/// +/// ```sql +/// '2020-06-11T16:53:30Z' < time < '2020-06-11T16:55:00Z' +/// ``` +/// +/// and then applies the predicate to the individual rows: +/// +/// ```sql +/// cpu = 'cpu0' +/// ``` +/// +/// Producing the following result: +/// +/// ```text +/// name: cpu +/// time cpu usage_idle +/// ---- --- ---------- +/// 2020-06-11T16:53:40Z cpu0 90.29029029029029 +/// 2020-06-11T16:53:50Z cpu0 89.8 +/// 2020-06-11T16:54:00Z cpu0 90.09009009009009 +/// 2020-06-11T16:54:10Z cpu0 88.82235528942115 +/// ``` +/// +/// The following example is a little more complicated, but shows again how InfluxQL +/// separates the time ranges from the predicate: +/// +/// ```sql +/// SELECT cpu, usage_idle FROM cpu +/// WHERE +/// time > '2020-06-11T16:53:30Z' AND time < '2020-06-11T16:55:00Z' AND cpu = 'cpu0' OR cpu = 'cpu1' +/// ``` +/// +/// InfluxQL first filters rows based on the time range: +/// +/// ```sql +/// '2020-06-11T16:53:30Z' < time < '2020-06-11T16:55:00Z' +/// ``` +/// +/// and then applies the predicate to the individual rows: +/// +/// ```sql +/// cpu = 'cpu0' OR cpu = 'cpu1' +/// ``` +/// +/// This is certainly quite different to SQL, which would evaluate the predicate as: +/// +/// ```sql +/// SELECT cpu, usage_idle FROM cpu +/// WHERE +/// (time > '2020-06-11T16:53:30Z' AND time < '2020-06-11T16:55:00Z' AND cpu = 'cpu0') OR cpu = 'cpu1' +/// ``` +/// +/// ## Time ranges are not normal +/// +/// Here we demonstrate how the operators combining time ranges do not matter. Using the +/// original query: +/// +/// ```sql +/// SELECT cpu, usage_idle FROM cpu +/// WHERE +/// time > '2020-06-11T16:53:30Z' AND time < '2020-06-11T16:55:00Z' AND cpu = 'cpu0' +/// ``` +/// +/// we replace all `AND` operators with `OR`: +/// +/// ```sql +/// SELECT cpu, usage_idle FROM cpu +/// WHERE +/// time > '2020-06-11T16:53:30Z' OR time < '2020-06-11T16:55:00Z' OR cpu = 'cpu0' +/// ``` +/// +/// This should return all rows, but yet it returns the same result 🤯: +/// +/// ```text +/// name: cpu +/// time cpu usage_idle +/// ---- --- ---------- +/// 2020-06-11T16:53:40Z cpu0 90.29029029029029 +/// 2020-06-11T16:53:50Z cpu0 89.8 +/// 2020-06-11T16:54:00Z cpu0 90.09009009009009 +/// 2020-06-11T16:54:10Z cpu0 88.82235528942115 +/// ``` +/// +/// It becomes clearer, if we again review at how InfluxQL OG evaluates the `WHERE` +/// predicate, InfluxQL first filters rows based on the time range, which uses the +/// rules previously defined by finding `>` and `≥` to determine the lower bound +/// and `<` and `≤`: +/// +/// ```sql +/// '2020-06-11T16:53:30Z' < time < '2020-06-11T16:55:00Z' +/// ``` +/// +/// and then applies the predicate to the individual rows: +/// +/// ```sql +/// cpu = 'cpu0' +/// ``` +/// +/// ## How to think of time ranges intuitively +/// +/// Imagine a slight variation of InfluxQL has a separate _time bounds clause_. +/// It could have two forms, first as a `BETWEEN` +/// +/// ```sql +/// SELECT cpu, usage_idle FROM cpu +/// WITH TIME BETWEEN '2020-06-11T16:53:30Z' AND '2020-06-11T16:55:00Z' +/// WHERE +/// cpu = 'cpu0' +/// ``` +/// +/// or as an `IN` to select multiple points: +/// +/// ```sql +/// SELECT cpu, usage_idle FROM cpu +/// WITH TIME IN ('2004-04-09T12:00:00Z', '2004-04-09T12:00:10Z', ...) +/// WHERE +/// cpu = 'cpu0' +/// ``` +pub fn rewrite_time_range_exprs( + expr: Expr, + simplifier: &ExprSimplifier>, +) -> common::Result { + let has_time_range = matches!( + expr.apply(&mut |expr| Ok(match expr { + expr @ Expr::BinaryExpr(_) if is_time_range(expr) => { + VisitRecursion::Stop + } + _ => VisitRecursion::Continue, + })) + .expect("infallible"), + VisitRecursion::Stop + ); + + // Nothing to do if there are no time range expressions + if !has_time_range { + return Ok(expr); + } + + let mut rw = SeparateTimeRange::new(simplifier); + expr.visit(&mut rw)?; + + // When `expr` contains both time expressions using relational + // operators like > or <= and equality, such as + // + // WHERE time > now() - 5s OR time = '2004-04-09:12:00:00Z' AND cpu = 'cpu0' + // + // the entire expression evaluates to `false` to be consistent with InfluxQL. + if !rw.time_range.is_unbounded() && !rw.equality.is_empty() { + return Ok(lit(false)); + } + + let lhs = if let Some(expr) = rw.time_range.as_expr() { + Some(expr) + } else if !rw.equality.is_empty() { + disjunction(rw.equality) + } else { + None + }; + + let rhs = rw + .stack + .pop() + .ok_or_else(|| error::map::internal("expected expression on stack"))?; + + Ok(match (lhs, rhs) { + (Some(lhs), Some(rhs)) => binary_expr(lhs, Operator::And, rhs), + (Some(expr), None) | (None, Some(expr)) => expr, + (None, None) => lit(true), + }) +} + +/// Returns `true` if `expr` refers to the `time` column. +fn is_time_column(expr: &Expr) -> bool { + matches!(expr, Expr::Column(Column{ name, .. }) if name == "time") +} + +/// Returns `true` if `expr` is a time range expression +/// +/// Examples include: +/// +/// ```text +/// time = '2004-04-09T12:00:00Z' +/// ``` +/// +/// or +/// +/// ```text +/// time > now() - 5m +/// ``` +fn is_time_range(expr: &Expr) -> bool { + use datafusion::logical_expr::BinaryExpr; + use datafusion::logical_expr::Operator::*; + match expr { + Expr::BinaryExpr(BinaryExpr { + left, + op: Eq | NotEq | Gt | Lt | GtEq | LtEq, + right, + }) => is_time_column(left) || is_time_column(right), + _ => false, + } +} + +/// Represents the lower bound, in nanoseconds, of a [`TimeRange`]. +pub struct LowerBound(Bound); + +impl LowerBound { + /// Create a new, time bound that is unbounded + fn unbounded() -> Self { + Self(Bound::Unbounded) + } + + /// Create a new, time bound that includes `v` + fn included(v: i64) -> Self { + Self(Bound::Included(v)) + } + + /// Create a new, time bound that excludes `v` + fn excluded(v: i64) -> Self { + Self(Bound::Excluded(v)) + } + + /// Returns `true` if the receiver is unbounded. + fn is_unbounded(&self) -> bool { + matches!(self.0, Bound::Unbounded) + } + + fn as_expr(&self) -> Option { + match self.0 { + Bound::Included(ts) => Some( + "time" + .as_expr() + .gt_eq(lit(ScalarValue::TimestampNanosecond(Some(ts), None))), + ), + Bound::Excluded(ts) => Some( + "time" + .as_expr() + .gt(lit(ScalarValue::TimestampNanosecond(Some(ts), None))), + ), + Bound::Unbounded => None, + } + } +} + +impl Default for LowerBound { + fn default() -> Self { + Self::unbounded() + } +} + +impl Eq for LowerBound {} + +impl PartialEq for LowerBound { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == Ordering::Equal + } +} + +impl PartialOrd for LowerBound { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for LowerBound { + fn cmp(&self, other: &Self) -> Ordering { + match (self.0, other.0) { + (Bound::Unbounded, Bound::Unbounded) => Ordering::Equal, + (Bound::Unbounded, _) => Ordering::Less, + (_, Bound::Unbounded) => Ordering::Greater, + (Bound::Included(a), Bound::Included(b)) | (Bound::Excluded(a), Bound::Excluded(b)) => { + a.cmp(&b) + } + (Bound::Included(a), Bound::Excluded(b)) => match a.cmp(&b) { + Ordering::Equal => Ordering::Less, + // We know that if a > b, b + 1 is safe from overflow + Ordering::Greater if a == b + 1 => Ordering::Equal, + ordering => ordering, + }, + (Bound::Excluded(a), Bound::Included(b)) => match a.cmp(&b) { + Ordering::Equal => Ordering::Greater, + // We know that if a < b, a + 1 is safe from overflow + Ordering::Less if a + 1 == b => Ordering::Equal, + ordering => ordering, + }, + } + } +} + +/// Represents the upper bound, in nanoseconds, of a [`TimeRange`]. +pub struct UpperBound(Bound); + +impl UpperBound { + /// Create a new, unbounded upper bound. + fn unbounded() -> Self { + Self(Bound::Unbounded) + } + + /// Create a new, upper bound that includes `v` + fn included(v: i64) -> Self { + Self(Bound::Included(v)) + } + + /// Create a new, upper bound that excludes `v` + fn excluded(v: i64) -> Self { + Self(Bound::Excluded(v)) + } + + /// Returns `true` if the receiver is unbounded. + fn is_unbounded(&self) -> bool { + matches!(self.0, Bound::Unbounded) + } + + fn as_expr(&self) -> Option { + match self.0 { + Bound::Included(ts) => Some( + "time" + .as_expr() + .lt_eq(lit(ScalarValue::TimestampNanosecond(Some(ts), None))), + ), + Bound::Excluded(ts) => Some( + "time" + .as_expr() + .lt(lit(ScalarValue::TimestampNanosecond(Some(ts), None))), + ), + Bound::Unbounded => None, + } + } +} + +impl Default for UpperBound { + fn default() -> Self { + Self::unbounded() + } +} + +impl Eq for UpperBound {} + +impl PartialEq for UpperBound { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == Ordering::Equal + } +} + +impl PartialOrd for UpperBound { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for UpperBound { + fn cmp(&self, other: &Self) -> Ordering { + match (self.0, other.0) { + (Bound::Unbounded, Bound::Unbounded) => Ordering::Equal, + (Bound::Unbounded, _) => Ordering::Greater, + (_, Bound::Unbounded) => Ordering::Less, + (Bound::Included(a), Bound::Included(b)) | (Bound::Excluded(a), Bound::Excluded(b)) => { + a.cmp(&b) + } + (Bound::Included(a), Bound::Excluded(b)) => match a.cmp(&b) { + Ordering::Equal => Ordering::Greater, + // We know that if a < b, b - 1 is safe from underflow + Ordering::Less if a == b - 1 => Ordering::Equal, + ordering => ordering, + }, + (Bound::Excluded(a), Bound::Included(b)) => match a.cmp(&b) { + Ordering::Equal => Ordering::Less, + // We know that if a > b, a - 1 is safe from overflow + Ordering::Greater if a - 1 == b => Ordering::Equal, + ordering => ordering, + }, + } + } +} + +/// Represents a time range, with a single lower and upper bound. +#[derive(Default, PartialEq, Eq)] +struct TimeRange { + lower: LowerBound, + upper: UpperBound, +} + +impl TimeRange { + /// Returns `true` if the time range is unbounded. + fn is_unbounded(&self) -> bool { + self.lower.is_unbounded() && self.upper.is_unbounded() + } + + /// Returns the time range as a conditional expression. + fn as_expr(&self) -> Option { + match (self.lower.as_expr(), self.upper.as_expr()) { + (None, None) => None, + (Some(e), None) | (None, Some(e)) => Some(e), + (Some(lower), Some(upper)) => Some(lower.and(upper)), + } + } +} + +struct SeparateTimeRange<'a> { + simplifier: &'a ExprSimplifier>, + stack: Vec>, + time_range: TimeRange, + + /// Equality time range expressions, such as: + /// + /// ```sql + /// time = '2004-04-09T12:00:00Z' + equality: Vec, +} + +impl<'a> SeparateTimeRange<'a> { + fn new(simplifier: &'a ExprSimplifier>) -> Self { + Self { + simplifier, + stack: vec![], + time_range: Default::default(), + equality: vec![], + } + } +} + +impl<'a> TreeNodeVisitor for SeparateTimeRange<'a> { + type N = Expr; + + fn pre_visit(&mut self, node: &Self::N) -> common::Result { + if matches!(node, Expr::BinaryExpr(_)) { + Ok(VisitRecursion::Continue) + } else { + Ok(VisitRecursion::Skip) + } + } + + fn post_visit(&mut self, node: &Self::N) -> common::Result { + use datafusion::logical_expr::BinaryExpr; + use datafusion::logical_expr::Operator::*; + + match node { + // A binary expression where either the left or right + // expression refers to the "time" column. + // + // "time" OP expression + // expression OP "time" + // + Expr::BinaryExpr(BinaryExpr { + left, + op: op @ (Eq | NotEq | Gt | Lt | GtEq | LtEq), + right, + }) if is_time_column(left) | is_time_column(right) => { + self.stack.push(None); + + if matches!(op, Eq | NotEq) { + let node = self.simplifier.simplify(node.clone())?; + self.equality.push(node); + return Ok(VisitRecursion::Continue); + } + + /// Op is the limited set of operators expected from here on, + /// to avoid repeated wildcard match arms with unreachable!(). + enum Op { + Gt, + GtEq, + Lt, + LtEq, + } + + // Map the DataFusion Operator to Op + let op = match op { + Gt => Op::Gt, + GtEq => Op::GtEq, + Lt => Op::Lt, + LtEq => Op::LtEq, + _ => unreachable!("expected: Gt | Lt | GtEq | LtEq"), + }; + + let (expr, op) = if is_time_column(left) { + (right, op) + } else { + // swap the operators when the conditional is `expression OP "time"` + ( + left, + match op { + Op::Gt => Op::Lt, + Op::GtEq => Op::LtEq, + Op::Lt => Op::Gt, + Op::LtEq => Op::GtEq, + }, + ) + }; + + // resolve `now()` and reduce binary expressions to a single constant + let expr = self.simplifier.simplify(*expr.clone())?; + + let ts = match expr { + Expr::Literal(ScalarValue::TimestampNanosecond(Some(ts), _)) => ts, + expr => { + return error::internal(format!( + "expected TimestampNanosecond, got: {}", + expr + )) + } + }; + + match op { + Op::Gt => { + let ts = LowerBound::excluded(ts); + if ts > self.time_range.lower { + self.time_range.lower = ts; + } + } + Op::GtEq => { + let ts = LowerBound::included(ts); + if ts > self.time_range.lower { + self.time_range.lower = ts; + } + } + Op::Lt => { + let ts = UpperBound::excluded(ts); + if ts < self.time_range.upper { + self.time_range.upper = ts; + } + } + Op::LtEq => { + let ts = UpperBound::included(ts); + if ts < self.time_range.upper { + self.time_range.upper = ts; + } + } + } + + Ok(VisitRecursion::Continue) + } + + node @ Expr::BinaryExpr(BinaryExpr { + op: + Eq | NotEq | Gt | GtEq | Lt | LtEq | RegexMatch | RegexNotMatch | RegexIMatch + | RegexNotIMatch, + .. + }) => { + self.stack.push(Some(node.clone())); + Ok(VisitRecursion::Continue) + } + Expr::BinaryExpr(BinaryExpr { + op: op @ (And | Or), + .. + }) => { + let right = self + .stack + .pop() + .ok_or_else(|| error::map::internal("invalid expr stack"))?; + let left = self + .stack + .pop() + .ok_or_else(|| error::map::internal("invalid expr stack"))?; + self.stack.push(match (left, right) { + (Some(left), Some(right)) => Some(binary_expr(left, *op, right)), + (None, Some(node)) | (Some(node), None) => Some(node), + (None, None) => None, + }); + + Ok(VisitRecursion::Continue) + } + _ => Ok(VisitRecursion::Continue), + } + } +} + +#[cfg(test)] +mod test { + use crate::plan::planner::test_utils::{execution_props, new_schemas}; + use crate::plan::planner::time_range::{LowerBound, UpperBound}; + use datafusion::logical_expr::{binary_expr, lit, lit_timestamp_nano, now, Operator}; + use datafusion::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext}; + use datafusion_util::AsExpr; + use std::sync::Arc; + + #[test] + fn test_rewrite_time_range_exprs() { + use crate::plan::planner::time_range::rewrite_time_range_exprs; + use datafusion::common::ScalarValue as V; + + test_helpers::maybe_start_logging(); + + let props = execution_props(); + let (schemas, _) = new_schemas(); + let simplify_context = + SimplifyContext::new(&props).with_schema(Arc::clone(&schemas.df_schema)); + let simplifier = ExprSimplifier::new(simplify_context); + + let rewrite = |expr| { + rewrite_time_range_exprs(expr, &simplifier) + .unwrap() + .to_string() + }; + + let expr = "time" + .as_expr() + .gt_eq(now() - lit(V::new_interval_dt(0, 1000))); + assert_eq!( + rewrite(expr), + r#"time >= TimestampNanosecond(1672531199000000000, None)"# + ); + + // reduces the lower bound to a single expression + let expr = "time" + .as_expr() + .gt_eq(now() - lit(V::new_interval_dt(0, 1000))) + .and( + "time" + .as_expr() + .gt_eq(now() - lit(V::new_interval_dt(0, 500))), + ); + assert_eq!( + rewrite(expr), + r#"time >= TimestampNanosecond(1672531199500000000, None)"# + ); + + let expr = "time" + .as_expr() + .lt_eq(now() - lit(V::new_interval_dt(0, 1000))); + assert_eq!( + rewrite(expr), + r#"time <= TimestampNanosecond(1672531199000000000, None)"# + ); + + // reduces the upper bound to a single expression + let expr = "time" + .as_expr() + .lt_eq(now() + lit(V::new_interval_dt(0, 1000))) + .and( + "time" + .as_expr() + .lt_eq(now() + lit(V::new_interval_dt(0, 500))), + ); + assert_eq!( + rewrite(expr), + r#"time <= TimestampNanosecond(1672531200500000000, None)"# + ); + + let expr = "time" + .as_expr() + .gt_eq(now() - lit(V::new_interval_dt(0, 1000))) + .and("time".as_expr().lt(now())); + assert_eq!( + rewrite(expr), + r#"time >= TimestampNanosecond(1672531199000000000, None) AND time < TimestampNanosecond(1672531200000000000, None)"# + ); + + let expr = "time" + .as_expr() + .gt_eq(now() - lit(V::new_interval_dt(0, 1000))) + .and("cpu".as_expr().eq(lit("cpu0"))); + assert_eq!( + rewrite(expr), + r#"time >= TimestampNanosecond(1672531199000000000, None) AND cpu = Utf8("cpu0")"# + ); + + let expr = "time".as_expr().eq(lit_timestamp_nano(0)); + assert_eq!(rewrite(expr), r#"time = TimestampNanosecond(0, None)"#); + + let expr = "instance" + .as_expr() + .eq(lit("instance-01")) + .or("instance".as_expr().eq(lit("instance-02"))) + .and( + "time" + .as_expr() + .gt_eq(now() - lit(V::new_interval_dt(0, 60_000))), + ); + assert_eq!( + rewrite(expr), + r#"time >= TimestampNanosecond(1672531140000000000, None) AND (instance = Utf8("instance-01") OR instance = Utf8("instance-02"))"# + ); + + let expr = "time" + .as_expr() + .gt_eq(now() - lit(V::new_interval_dt(0, 60_000))) + .and("time".as_expr().lt(now())) + .and( + "cpu" + .as_expr() + .eq(lit("cpu0")) + .or("cpu".as_expr().eq(lit("cpu1"))), + ); + assert_eq!( + rewrite(expr), + r#"time >= TimestampNanosecond(1672531140000000000, None) AND time < TimestampNanosecond(1672531200000000000, None) AND (cpu = Utf8("cpu0") OR cpu = Utf8("cpu1"))"# + ); + + // time >= now - 60s AND time < now() OR cpu = 'cpu0' OR cpu = 'cpu1' + // + // Expects the time range to be combined with a conjunction (AND) + let expr = "time" + .as_expr() + .gt_eq(now() - lit(V::new_interval_dt(0, 60_000))) + .and("time".as_expr().lt(now())) + .or("cpu" + .as_expr() + .eq(lit("cpu0")) + .or("cpu".as_expr().eq(lit("cpu1")))); + assert_eq!( + rewrite(expr), + r#"time >= TimestampNanosecond(1672531140000000000, None) AND time < TimestampNanosecond(1672531200000000000, None) AND (cpu = Utf8("cpu0") OR cpu = Utf8("cpu1"))"# + ); + + // time = 0 OR time = 10 AND cpu = 'cpu0' + let expr = "time".as_expr().eq(lit_timestamp_nano(0)).or("time" + .as_expr() + .eq(lit_timestamp_nano(10)) + .and("cpu".as_expr().eq(lit("cpu0")))); + assert_eq!( + rewrite(expr), + r#"(time = TimestampNanosecond(0, None) OR time = TimestampNanosecond(10, None)) AND cpu = Utf8("cpu0")"# + ); + + // no time + let expr = "f64".as_expr().gt_eq(lit(19.5_f64)).or(binary_expr( + "f64".as_expr(), + Operator::RegexMatch, + lit("foo"), + )); + assert_eq!( + rewrite(expr), + "f64 >= Float64(19.5) OR (f64 ~ Utf8(\"foo\"))" + ); + + // fallible + + let expr = "time" + .as_expr() + .eq(lit_timestamp_nano(0)) + .or("time".as_expr().gt(now())); + assert_eq!(rewrite(expr), "Boolean(false)"); + } + #[test] + fn test_lower_bound_cmp() { + let (a, b) = (LowerBound::unbounded(), LowerBound::unbounded()); + assert!(a == b); + + let (a, b) = (LowerBound::included(5), LowerBound::included(5)); + assert!(a == b); + + let (a, b) = (LowerBound::included(5), LowerBound::included(6)); + assert!(a < b); + + // a >= 6 gt a >= 5 + let (a, b) = (LowerBound::included(6), LowerBound::included(5)); + assert!(a > b); + + let (a, b) = (LowerBound::excluded(5), LowerBound::excluded(5)); + assert!(a == b); + + let (a, b) = (LowerBound::excluded(5), LowerBound::excluded(6)); + assert!(a < b); + + let (a, b) = (LowerBound::excluded(6), LowerBound::excluded(5)); + assert!(a > b); + + let (a, b) = (LowerBound::unbounded(), LowerBound::included(5)); + assert!(a < b); + + let (a, b) = (LowerBound::unbounded(), LowerBound::excluded(5)); + assert!(a < b); + + let (a, b) = (LowerBound::included(5), LowerBound::unbounded()); + assert!(a > b); + + let (a, b) = (LowerBound::excluded(5), LowerBound::unbounded()); + assert!(a > b); + + let (a, b) = (LowerBound::included(5), LowerBound::excluded(5)); + assert!(a < b); + + let (a, b) = (LowerBound::included(5), LowerBound::excluded(4)); + assert!(a == b); + + let (a, b) = (LowerBound::included(5), LowerBound::excluded(6)); + assert!(a < b); + + let (a, b) = (LowerBound::included(6), LowerBound::excluded(5)); + assert!(a == b); + + let (a, b) = (LowerBound::excluded(5), LowerBound::included(5)); + assert!(a > b); + + let (a, b) = (LowerBound::excluded(5), LowerBound::included(6)); + assert!(a == b); + + let (a, b) = (LowerBound::excluded(6), LowerBound::included(5)); + assert!(a > b); + } + + #[test] + fn test_upper_bound_cmp() { + let (a, b) = (UpperBound::unbounded(), UpperBound::unbounded()); + assert!(a == b); + + let (a, b) = (UpperBound::included(5), UpperBound::included(5)); + assert!(a == b); + + let (a, b) = (UpperBound::included(5), UpperBound::included(6)); + assert!(a < b); + + let (a, b) = (UpperBound::included(6), UpperBound::included(5)); + assert!(a > b); + + let (a, b) = (UpperBound::excluded(5), UpperBound::excluded(5)); + assert!(a == b); + + let (a, b) = (UpperBound::excluded(5), UpperBound::excluded(6)); + assert!(a < b); + + let (a, b) = (UpperBound::excluded(6), UpperBound::excluded(5)); + assert!(a > b); + + let (a, b) = (UpperBound::unbounded(), UpperBound::included(5)); + assert!(a > b); + + let (a, b) = (UpperBound::unbounded(), UpperBound::excluded(5)); + assert!(a > b); + + let (a, b) = (UpperBound::included(5), UpperBound::unbounded()); + assert!(a < b); + + let (a, b) = (UpperBound::excluded(5), UpperBound::unbounded()); + assert!(a < b); + + let (a, b) = (UpperBound::included(5), UpperBound::excluded(5)); + assert!(a > b); + + let (a, b) = (UpperBound::included(5), UpperBound::excluded(4)); + assert!(a > b); + + let (a, b) = (UpperBound::included(5), UpperBound::excluded(6)); + assert!(a == b); + + let (a, b) = (UpperBound::included(5), UpperBound::excluded(7)); + assert!(a < b); + + let (a, b) = (UpperBound::excluded(5), UpperBound::included(5)); + assert!(a < b); + + let (a, b) = (UpperBound::excluded(5), UpperBound::included(6)); + assert!(a < b); + + let (a, b) = (UpperBound::excluded(5), UpperBound::included(4)); + assert!(a == b); + } +}