Merge pull request #7613 from influxdata/sgc/issue/time_range_7610
fix: Rewrite time range expressions so they apply to entire result setpull/24376/head
commit
4785ba2e33
|
@ -77,6 +77,18 @@ SELECT i64 FROM m0 WHERE time > '2022-10-31 02:00:10';
|
|||
-- try increasing the number of days 😂
|
||||
SELECT i64 FROM m0 WHERE time > now() - 100000d;
|
||||
|
||||
-- the time range should apply to the the entire result set not just cpu = 'cpu1'
|
||||
SELECT cpu, usage_idle FROM cpu WHERE cpu = 'cpu0' OR cpu = 'cpu1' AND time >= 1667181610000000000;
|
||||
|
||||
-- combining relational operators for time ranges should return no results, as it
|
||||
-- evaluates to false, like InfluxQL
|
||||
SELECT cpu, usage_idle FROM cpu WHERE time >= 1667181610000000000 OR time = 1667181600000000000;
|
||||
|
||||
-- BONUS: InfluxQL now supports combining equality with OR
|
||||
SELECT cpu, usage_idle FROM cpu WHERE time = 1667181610000000000 OR time = 1667181600000000000 AND cpu = 'cpu0';
|
||||
SELECT cpu, usage_idle FROM cpu WHERE time = 1667181610000000000 OR time = 1667181600000000000;
|
||||
|
||||
|
||||
-- NOT NULL test
|
||||
-- WHERE tag1 != '' is the equivalent to tag1 IS NOT NULL
|
||||
-- TODO(sgc): This is working, but likely by accident
|
||||
|
|
|
@ -225,6 +225,39 @@ name: m0
|
|||
| 2022-10-31T02:00:20 | 191 |
|
||||
| 2022-10-31T02:00:30 | 392 |
|
||||
+---------------------+-----+
|
||||
-- InfluxQL: SELECT cpu, usage_idle FROM cpu WHERE cpu = 'cpu0' OR cpu = 'cpu1' AND time >= 1667181610000000000;
|
||||
name: cpu
|
||||
+---------------------+------+------------+
|
||||
| time | cpu | usage_idle |
|
||||
+---------------------+------+------------+
|
||||
| 2022-10-31T02:00:10 | cpu0 | 0.99 |
|
||||
| 2022-10-31T02:00:10 | cpu1 | 1.99 |
|
||||
+---------------------+------+------------+
|
||||
-- InfluxQL: SELECT cpu, usage_idle FROM cpu WHERE time >= 1667181610000000000 OR time = 1667181600000000000;
|
||||
+------+-----+------------+
|
||||
| time | cpu | usage_idle |
|
||||
+------+-----+------------+
|
||||
+------+-----+------------+
|
||||
-- InfluxQL: SELECT cpu, usage_idle FROM cpu WHERE time = 1667181610000000000 OR time = 1667181600000000000 AND cpu = 'cpu0';
|
||||
name: cpu
|
||||
+---------------------+------+------------+
|
||||
| time | cpu | usage_idle |
|
||||
+---------------------+------+------------+
|
||||
| 2022-10-31T02:00:00 | cpu0 | 0.98 |
|
||||
| 2022-10-31T02:00:10 | cpu0 | 0.99 |
|
||||
+---------------------+------+------------+
|
||||
-- InfluxQL: SELECT cpu, usage_idle FROM cpu WHERE time = 1667181610000000000 OR time = 1667181600000000000;
|
||||
name: cpu
|
||||
+---------------------+-----------+------------+
|
||||
| time | cpu | usage_idle |
|
||||
+---------------------+-----------+------------+
|
||||
| 2022-10-31T02:00:00 | cpu-total | 2.98 |
|
||||
| 2022-10-31T02:00:00 | cpu0 | 0.98 |
|
||||
| 2022-10-31T02:00:00 | cpu1 | 1.98 |
|
||||
| 2022-10-31T02:00:10 | cpu-total | 2.99 |
|
||||
| 2022-10-31T02:00:10 | cpu0 | 0.99 |
|
||||
| 2022-10-31T02:00:10 | cpu1 | 1.99 |
|
||||
+---------------------+-----------+------------+
|
||||
-- InfluxQL: SELECT tag1, f64 FROM m0 WHERE tag1 != '';
|
||||
name: m0
|
||||
+---------------------+-------+------+
|
||||
|
|
|
@ -1,11 +1,9 @@
|
|||
mod select;
|
||||
|
||||
use crate::plan::error;
|
||||
use crate::plan::planner::select::{
|
||||
check_exprs_satisfy_columns, fields_to_exprs_no_nulls, make_tag_key_column_meta,
|
||||
plan_with_sort, ToSortExpr,
|
||||
};
|
||||
use crate::plan::planner_rewrite_expression::{rewrite_conditional, rewrite_expr};
|
||||
use crate::plan::planner_time_range_expression::{
|
||||
duration_expr_to_nanoseconds, expr_to_df_interval_dt, time_range_to_df_expr,
|
||||
};
|
||||
|
@ -14,12 +12,12 @@ use crate::plan::rewriter::{
|
|||
};
|
||||
use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, Schemas};
|
||||
use crate::plan::var_ref::{column_type_to_var_ref_data_type, var_ref_data_type_to_data_type};
|
||||
use crate::plan::{error, planner_rewrite_expression};
|
||||
use arrow::array::{StringBuilder, StringDictionaryBuilder};
|
||||
use arrow::datatypes::{DataType, Field as ArrowField, Int32Type, Schema as ArrowSchema};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use chrono_tz::Tz;
|
||||
use datafusion::catalog::TableReference;
|
||||
use datafusion::common::tree_node::{TreeNode, TreeNodeRewriter};
|
||||
use datafusion::common::{DFSchema, DFSchemaRef, Result, ScalarValue, ToDFSchema};
|
||||
use datafusion::datasource::{provider_as_source, MemTable};
|
||||
use datafusion::logical_expr::expr_rewriter::normalize_col;
|
||||
|
@ -28,10 +26,10 @@ use datafusion::logical_expr::logical_plan::Analyze;
|
|||
use datafusion::logical_expr::utils::{expr_as_column_expr, find_aggregate_exprs};
|
||||
use datafusion::logical_expr::{
|
||||
binary_expr, col, date_bin, expr, expr::WindowFunction, lit, lit_timestamp_nano, now,
|
||||
window_function, Aggregate, AggregateFunction, AggregateUDF, Between, BinaryExpr,
|
||||
BuiltInWindowFunction, BuiltinScalarFunction, EmptyRelation, Explain, Expr, ExprSchemable,
|
||||
Extension, GetIndexedField, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ScalarUDF,
|
||||
TableSource, ToStringifiedPlan, WindowFrame, WindowFrameBound, WindowFrameUnits,
|
||||
window_function, Aggregate, AggregateFunction, AggregateUDF, Between, BuiltInWindowFunction,
|
||||
BuiltinScalarFunction, EmptyRelation, Explain, Expr, ExprSchemable, Extension, GetIndexedField,
|
||||
LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ScalarUDF, TableSource, ToStringifiedPlan,
|
||||
WindowFrame, WindowFrameBound, WindowFrameUnits,
|
||||
};
|
||||
use datafusion::prelude::{cast, sum, when, Column};
|
||||
use datafusion_util::{lit_dict, AsExpr};
|
||||
|
@ -931,7 +929,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
) -> Result<Expr> {
|
||||
let expr =
|
||||
self.expr_to_df_expr(&ctx.with_scope(ExprScope::Projection), &field.expr, schemas)?;
|
||||
let expr = rewrite_field_expr(expr, schemas)?;
|
||||
let expr = planner_rewrite_expression::rewrite_field_expr(expr, schemas)?;
|
||||
normalize_col(
|
||||
if let Some(alias) = &field.alias {
|
||||
expr.alias(alias.deref())
|
||||
|
@ -982,6 +980,14 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
// one or the other is true
|
||||
&& (lhs_time ^ rhs_time)
|
||||
{
|
||||
// != operator is not supported by InfluxQL, as it results in every point,
|
||||
// except this and is potentially very expensive.
|
||||
//
|
||||
// See: https://github.com/influxdata/influxql/blob/3029b95d4ff5951de68e1d7a1c3039236025815a/ast.go#L5831-L5832
|
||||
if op == Operator::NotEq {
|
||||
return error::query("invalid time comparison operator: !=");
|
||||
}
|
||||
|
||||
if lhs_time {
|
||||
(
|
||||
self.conditional_to_df_expr(ctx, lhs, schemas)?,
|
||||
|
@ -1273,7 +1279,8 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
where_clause,
|
||||
schemas,
|
||||
)?;
|
||||
let filter_expr = rewrite_conditional_expr(filter_expr, schemas)?;
|
||||
let filter_expr =
|
||||
planner_rewrite_expression::rewrite_conditional_expr(filter_expr, schemas)?;
|
||||
let plan = LogicalPlanBuilder::from(plan)
|
||||
.filter(filter_expr)?
|
||||
.build()?;
|
||||
|
@ -2113,72 +2120,6 @@ fn find_tag_and_unknown_columns(fields: &FieldList) -> impl Iterator<Item = &str
|
|||
})
|
||||
}
|
||||
|
||||
/// Perform a series of passes to rewrite `expr` in compliance with InfluxQL behavior
|
||||
/// in an effort to ensure the query executes without error.
|
||||
fn rewrite_conditional_expr(expr: Expr, schemas: &Schemas) -> Result<Expr> {
|
||||
let expr = expr.rewrite(&mut FixRegularExpressions { schemas })?;
|
||||
rewrite_conditional(expr, schemas)
|
||||
}
|
||||
|
||||
/// Perform a series of passes to rewrite `expr`, used as a column projection,
|
||||
/// to match the behavior of InfluxQL.
|
||||
fn rewrite_field_expr(expr: Expr, schemas: &Schemas) -> Result<Expr> {
|
||||
rewrite_expr(expr, schemas)
|
||||
}
|
||||
|
||||
/// Rewrite regex conditional expressions to match InfluxQL behaviour.
|
||||
struct FixRegularExpressions<'a> {
|
||||
schemas: &'a Schemas,
|
||||
}
|
||||
|
||||
impl<'a> TreeNodeRewriter for FixRegularExpressions<'a> {
|
||||
type N = Expr;
|
||||
|
||||
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
|
||||
match expr {
|
||||
// InfluxQL evaluates regular expression conditions to false if the column is numeric
|
||||
// or the column doesn't exist.
|
||||
Expr::BinaryExpr(BinaryExpr {
|
||||
left,
|
||||
op: op @ (Operator::RegexMatch | Operator::RegexNotMatch),
|
||||
right,
|
||||
}) => {
|
||||
if let Expr::Column(ref col) = *left {
|
||||
match self.schemas.iox_schema.field_by_name(&col.name) {
|
||||
Some((InfluxColumnType::Tag, _)) => {
|
||||
// Regular expressions expect to be compared with a Utf8
|
||||
let left =
|
||||
Box::new(left.cast_to(&DataType::Utf8, &self.schemas.df_schema)?);
|
||||
Ok(Expr::BinaryExpr(BinaryExpr { left, op, right }))
|
||||
}
|
||||
Some((InfluxColumnType::Field(InfluxFieldType::String), _)) => {
|
||||
Ok(Expr::BinaryExpr(BinaryExpr { left, op, right }))
|
||||
}
|
||||
// Any other column type should evaluate to false
|
||||
_ => Ok(lit(false)),
|
||||
}
|
||||
} else {
|
||||
// If this is not a simple column expression, evaluate to false,
|
||||
// to be consistent with InfluxQL.
|
||||
//
|
||||
// References:
|
||||
//
|
||||
// * https://github.com/influxdata/influxdb/blob/9308b6586a44e5999180f64a96cfb91e372f04dd/tsdb/index.go#L2487-L2488
|
||||
// * https://github.com/influxdata/influxdb/blob/9308b6586a44e5999180f64a96cfb91e372f04dd/tsdb/index.go#L2509-L2510
|
||||
//
|
||||
// The query engine does not correctly evaluate tag keys and values, always evaluating to false.
|
||||
//
|
||||
// Reference example:
|
||||
//
|
||||
// * `SELECT f64 FROM m0 WHERE tag0 = '' + tag0`
|
||||
Ok(lit(false))
|
||||
}
|
||||
}
|
||||
_ => Ok(expr),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn conditional_op_to_operator(op: ConditionalOperator) -> Result<Operator> {
|
||||
match op {
|
||||
ConditionalOperator::Eq => Ok(Operator::Eq),
|
||||
|
@ -3055,6 +2996,14 @@ mod test {
|
|||
"###
|
||||
);
|
||||
|
||||
// fallible
|
||||
|
||||
// Unsupported operator
|
||||
assert_snapshot!(plan("SELECT foo, f64_field FROM data where time != 0"), @"Error during planning: invalid time comparison operator: !=")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_regex_in_where() {
|
||||
// Regular expression equality tests
|
||||
|
||||
assert_snapshot!(plan("SELECT foo, f64_field FROM data where foo =~ /f/"), @r###"
|
||||
|
|
|
@ -121,16 +121,377 @@
|
|||
//! [`Reduce`]: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4850-L4852
|
||||
//! [`EvalBool`]: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4181-L4183
|
||||
//! [`Eval`]: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4137
|
||||
use crate::plan::error;
|
||||
use crate::plan::util::Schemas;
|
||||
use arrow::datatypes::DataType;
|
||||
use datafusion::common::tree_node::{Transformed, TreeNode};
|
||||
use datafusion::common::{Result, ScalarValue};
|
||||
use datafusion::common::tree_node::{
|
||||
Transformed, TreeNode, TreeNodeRewriter, TreeNodeVisitor, VisitRecursion,
|
||||
};
|
||||
use datafusion::common::{Column, Result, ScalarValue};
|
||||
use datafusion::logical_expr::{
|
||||
binary_expr, cast, coalesce, lit, BinaryExpr, Expr, ExprSchemable, Operator,
|
||||
};
|
||||
use datafusion::optimizer::utils::{conjunction, disjunction};
|
||||
use schema::{InfluxColumnType, InfluxFieldType};
|
||||
|
||||
/// Perform a series of passes to rewrite `expr` in compliance with InfluxQL behavior
|
||||
/// in an effort to ensure the query executes without error.
|
||||
pub(super) fn rewrite_conditional_expr(expr: Expr, schemas: &Schemas) -> Result<Expr> {
|
||||
let expr = expr.rewrite(&mut FixRegularExpressions { schemas })?;
|
||||
let expr = rewrite_time_range_exprs(expr)?;
|
||||
rewrite_conditional(expr, schemas)
|
||||
}
|
||||
|
||||
/// Perform a series of passes to rewrite `expr`, used as a column projection,
|
||||
/// to match the behavior of InfluxQL.
|
||||
pub(super) fn rewrite_field_expr(expr: Expr, schemas: &Schemas) -> Result<Expr> {
|
||||
rewrite_expr(expr, schemas)
|
||||
}
|
||||
|
||||
/// Traverse `expr` and promote time range expressions to the root
|
||||
/// binary node on the left hand side and combined using the conjunction (`AND`)
|
||||
/// operator to ensure the time range is applied to the entire predicate.
|
||||
///
|
||||
/// Additionally, multiple `time = <timestamp>` expressions are combined
|
||||
/// using the disjunction (OR) operator, whereas, InfluxQL only supports
|
||||
/// a single `time = <timestamp>` expression.
|
||||
///
|
||||
/// # NOTE
|
||||
///
|
||||
/// Combining relational operators like `time > now() - 5s` and equality
|
||||
/// operators like `time = <timestamp>` with a disjunction (`OR`)
|
||||
/// will evaluate to false, like InfluxQL.
|
||||
///
|
||||
/// # Background
|
||||
///
|
||||
/// The InfluxQL query engine always promotes the time range expression to filter
|
||||
/// all results. It is misleading that time ranges are written in the `WHERE` clause,
|
||||
/// as the `WHERE` predicate is not evaluated in its entirety for each row. Rather,
|
||||
/// InfluxQL extracts the time range to form a time bound for the entire query and
|
||||
/// removes any time range expressions from the filter predicate. The time range
|
||||
/// is determined using the `>` and `≥` operators to form the lower bound and
|
||||
/// the `<` and `≤` operators to form the upper bound. When multiple instances of
|
||||
/// the lower or upper bound operators are found, the time bounds will form the
|
||||
/// intersection. For example
|
||||
///
|
||||
/// ```sql
|
||||
/// WHERE time >= 1000 AND time >= 2000 AND time < 10000 and time < 9000
|
||||
/// ```
|
||||
///
|
||||
/// is equivalent to
|
||||
///
|
||||
/// ```sql
|
||||
/// WHERE time >= 2000 AND time < 9000
|
||||
/// ```
|
||||
///
|
||||
/// Further, InfluxQL only allows a single `time = <value>` binary expression. Multiple
|
||||
/// occurrences result in an empty result set.
|
||||
///
|
||||
/// ## Examples
|
||||
///
|
||||
/// Lets illustrate how InfluxQL applies predicates with a typical example, using the
|
||||
/// `metrics.lp` data in the IOx repository:
|
||||
///
|
||||
/// ```sql
|
||||
/// SELECT cpu, usage_idle FROM cpu
|
||||
/// WHERE
|
||||
/// time > '2020-06-11T16:53:30Z' AND time < '2020-06-11T16:55:00Z' AND cpu = 'cpu0'
|
||||
/// ```
|
||||
///
|
||||
/// InfluxQL first filters rows based on the time range:
|
||||
///
|
||||
/// ```sql
|
||||
/// '2020-06-11T16:53:30Z' < time < '2020-06-11T16:55:00Z'
|
||||
/// ```
|
||||
///
|
||||
/// and then applies the predicate to the individual rows:
|
||||
///
|
||||
/// ```sql
|
||||
/// cpu = 'cpu0'
|
||||
/// ```
|
||||
///
|
||||
/// Producing the following result:
|
||||
///
|
||||
/// ```text
|
||||
/// name: cpu
|
||||
/// time cpu usage_idle
|
||||
/// ---- --- ----------
|
||||
/// 2020-06-11T16:53:40Z cpu0 90.29029029029029
|
||||
/// 2020-06-11T16:53:50Z cpu0 89.8
|
||||
/// 2020-06-11T16:54:00Z cpu0 90.09009009009009
|
||||
/// 2020-06-11T16:54:10Z cpu0 88.82235528942115
|
||||
/// ```
|
||||
///
|
||||
/// The following example is a little more complicated, but shows again how InfluxQL
|
||||
/// separates the time ranges from the predicate:
|
||||
///
|
||||
/// ```sql
|
||||
/// SELECT cpu, usage_idle FROM cpu
|
||||
/// WHERE
|
||||
/// time > '2020-06-11T16:53:30Z' AND time < '2020-06-11T16:55:00Z' AND cpu = 'cpu0' OR cpu = 'cpu1'
|
||||
/// ```
|
||||
///
|
||||
/// InfluxQL first filters rows based on the time range:
|
||||
///
|
||||
/// ```sql
|
||||
/// '2020-06-11T16:53:30Z' < time < '2020-06-11T16:55:00Z'
|
||||
/// ```
|
||||
///
|
||||
/// and then applies the predicate to the individual rows:
|
||||
///
|
||||
/// ```sql
|
||||
/// cpu = 'cpu0' OR cpu = 'cpu1'
|
||||
/// ```
|
||||
///
|
||||
/// This is certainly quite different to SQL, which would evaluate the predicate as:
|
||||
///
|
||||
/// ```sql
|
||||
/// SELECT cpu, usage_idle FROM cpu
|
||||
/// WHERE
|
||||
/// (time > '2020-06-11T16:53:30Z' AND time < '2020-06-11T16:55:00Z' AND cpu = 'cpu0') OR cpu = 'cpu1'
|
||||
/// ```
|
||||
///
|
||||
/// ## Time ranges are not normal
|
||||
///
|
||||
/// Here we demonstrate how the operators combining time ranges do not matter. Using the
|
||||
/// original query:
|
||||
///
|
||||
/// ```sql
|
||||
/// SELECT cpu, usage_idle FROM cpu
|
||||
/// WHERE
|
||||
/// time > '2020-06-11T16:53:30Z' AND time < '2020-06-11T16:55:00Z' AND cpu = 'cpu0'
|
||||
/// ```
|
||||
///
|
||||
/// we replace all `AND` operators with `OR`:
|
||||
///
|
||||
/// ```sql
|
||||
/// SELECT cpu, usage_idle FROM cpu
|
||||
/// WHERE
|
||||
/// time > '2020-06-11T16:53:30Z' OR time < '2020-06-11T16:55:00Z' OR cpu = 'cpu0'
|
||||
/// ```
|
||||
///
|
||||
/// This should return all rows, but yet it returns the same result 🤯:
|
||||
///
|
||||
/// ```text
|
||||
/// name: cpu
|
||||
/// time cpu usage_idle
|
||||
/// ---- --- ----------
|
||||
/// 2020-06-11T16:53:40Z cpu0 90.29029029029029
|
||||
/// 2020-06-11T16:53:50Z cpu0 89.8
|
||||
/// 2020-06-11T16:54:00Z cpu0 90.09009009009009
|
||||
/// 2020-06-11T16:54:10Z cpu0 88.82235528942115
|
||||
/// ```
|
||||
///
|
||||
/// It becomes clearer, if we again review at how InfluxQL OG evaluates the `WHERE`
|
||||
/// predicate, InfluxQL first filters rows based on the time range, which uses the
|
||||
/// rules previously defined by finding `>` and `≥` to determine the lower bound
|
||||
/// and `<` and `≤`:
|
||||
///
|
||||
/// ```sql
|
||||
/// '2020-06-11T16:53:30Z' < time < '2020-06-11T16:55:00Z'
|
||||
/// ```
|
||||
///
|
||||
/// and then applies the predicate to the individual rows:
|
||||
///
|
||||
/// ```sql
|
||||
/// cpu = 'cpu0'
|
||||
/// ```
|
||||
///
|
||||
/// ## How to think of time ranges intuitively
|
||||
///
|
||||
/// Imagine a slight variation of InfluxQL has a separate _time bounds clause_.
|
||||
/// It could have two forms, first as a `BETWEEN`
|
||||
///
|
||||
/// ```sql
|
||||
/// SELECT cpu, usage_idle FROM cpu
|
||||
/// WITH TIME BETWEEN '2020-06-11T16:53:30Z' AND '2020-06-11T16:55:00Z'
|
||||
/// WHERE
|
||||
/// cpu = 'cpu0'
|
||||
/// ```
|
||||
///
|
||||
/// or as an `IN` to select multiple points:
|
||||
///
|
||||
/// ```sql
|
||||
/// SELECT cpu, usage_idle FROM cpu
|
||||
/// WITH TIME IN ('2004-04-09T12:00:00Z', '2004-04-09T12:00:10Z', ...)
|
||||
/// WHERE
|
||||
/// cpu = 'cpu0'
|
||||
/// ```
|
||||
fn rewrite_time_range_exprs(expr: Expr) -> Result<Expr> {
|
||||
let mut has_or = false;
|
||||
let mut has_time_range = false;
|
||||
expr.apply(&mut |expr| {
|
||||
match expr {
|
||||
Expr::BinaryExpr(BinaryExpr {
|
||||
op: Operator::Or, ..
|
||||
}) => has_or = true,
|
||||
expr @ Expr::BinaryExpr(_) if is_time_range(expr) => has_time_range = true,
|
||||
_ => return Ok(VisitRecursion::Continue),
|
||||
}
|
||||
|
||||
Ok(if has_or && has_time_range {
|
||||
// no need to continue if we've found both
|
||||
VisitRecursion::Stop
|
||||
} else {
|
||||
VisitRecursion::Continue
|
||||
})
|
||||
})?;
|
||||
|
||||
// if there is no time range expressions or there are no OR operators,
|
||||
// we don't need to rewrite the expression
|
||||
if !has_time_range || !has_or {
|
||||
return Ok(expr);
|
||||
}
|
||||
|
||||
let mut rw = SeparateTimeRanges::default();
|
||||
expr.visit(&mut rw)?;
|
||||
|
||||
// When `expr` contains both time expressions using relational
|
||||
// operators like > or <= and equality, such as
|
||||
//
|
||||
// WHERE time > now() - 5s OR time = '2004-04-09:12:00:00Z' AND cpu = 'cpu0'
|
||||
//
|
||||
// the entire expression evaluates to `false` to be consistent with InfluxQL.
|
||||
if !rw.relational.is_empty() && !rw.equality.is_empty() {
|
||||
return Ok(lit(false));
|
||||
}
|
||||
|
||||
let lhs = if !rw.relational.is_empty() {
|
||||
conjunction(rw.relational)
|
||||
} else if !rw.equality.is_empty() {
|
||||
disjunction(rw.equality)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let rhs = rw
|
||||
.stack
|
||||
.pop()
|
||||
.ok_or_else(|| error::map::internal("expected expression on stack"))?;
|
||||
|
||||
Ok(match (lhs, rhs) {
|
||||
(Some(lhs), Some(rhs)) => binary_expr(lhs, Operator::And, rhs),
|
||||
(Some(expr), None) | (None, Some(expr)) => expr,
|
||||
(None, None) => lit(true),
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns `true` if `expr` refers to the `time` column.
|
||||
fn is_time_column(expr: &Expr) -> bool {
|
||||
matches!(expr, Expr::Column(Column{ name, .. }) if name == "time")
|
||||
}
|
||||
|
||||
/// Returns `true` if `expr` is a time range expression
|
||||
///
|
||||
/// Examples include:
|
||||
///
|
||||
/// ```text
|
||||
/// time = '2004-04-09T12:00:00Z'
|
||||
/// ```
|
||||
///
|
||||
/// or
|
||||
///
|
||||
/// ```text
|
||||
/// time > now() - 5m
|
||||
/// ```
|
||||
fn is_time_range(expr: &Expr) -> bool {
|
||||
use Operator::*;
|
||||
match expr {
|
||||
Expr::BinaryExpr(BinaryExpr {
|
||||
left,
|
||||
op: Eq | NotEq | Gt | Lt | GtEq | LtEq,
|
||||
right,
|
||||
}) => is_time_column(left) || is_time_column(right),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct SeparateTimeRanges {
|
||||
stack: Vec<Option<Expr>>,
|
||||
/// Relational time range expressions, such as:
|
||||
///
|
||||
/// ```sql
|
||||
/// time >= now()
|
||||
/// ```
|
||||
relational: Vec<Expr>,
|
||||
|
||||
/// Equality time range expressions, such as:
|
||||
///
|
||||
/// ```sql
|
||||
/// time = '2004-04-09T12:00:00Z'
|
||||
equality: Vec<Expr>,
|
||||
}
|
||||
|
||||
impl TreeNodeVisitor for SeparateTimeRanges {
|
||||
type N = Expr;
|
||||
|
||||
fn pre_visit(&mut self, node: &Self::N) -> Result<VisitRecursion> {
|
||||
if matches!(node, Expr::BinaryExpr(_)) {
|
||||
Ok(VisitRecursion::Continue)
|
||||
} else {
|
||||
Ok(VisitRecursion::Skip)
|
||||
}
|
||||
}
|
||||
|
||||
fn post_visit(&mut self, node: &Self::N) -> Result<VisitRecursion> {
|
||||
use Operator::*;
|
||||
|
||||
match node {
|
||||
node @ Expr::BinaryExpr(_) if is_time_range(node) => {
|
||||
self.stack.push(None);
|
||||
// separate equality from relational time-range expressions
|
||||
if matches!(
|
||||
node,
|
||||
Expr::BinaryExpr(BinaryExpr {
|
||||
op: Gt | GtEq | Lt | LtEq,
|
||||
..
|
||||
})
|
||||
) {
|
||||
self.relational.push(node.clone())
|
||||
} else {
|
||||
self.equality.push(node.clone())
|
||||
}
|
||||
|
||||
Ok(VisitRecursion::Continue)
|
||||
}
|
||||
|
||||
node @ Expr::BinaryExpr(BinaryExpr {
|
||||
op:
|
||||
Eq | NotEq | Gt | GtEq | Lt | LtEq | RegexMatch | RegexNotMatch | RegexIMatch
|
||||
| RegexNotIMatch,
|
||||
..
|
||||
}) => {
|
||||
self.stack.push(Some(node.clone()));
|
||||
Ok(VisitRecursion::Continue)
|
||||
}
|
||||
Expr::BinaryExpr(BinaryExpr {
|
||||
op: op @ (And | Or),
|
||||
..
|
||||
}) => {
|
||||
let right = self
|
||||
.stack
|
||||
.pop()
|
||||
.ok_or_else(|| error::map::internal("invalid expr stack"))?;
|
||||
let left = self
|
||||
.stack
|
||||
.pop()
|
||||
.ok_or_else(|| error::map::internal("invalid expr stack"))?;
|
||||
self.stack.push(match (left, right) {
|
||||
(Some(left), Some(right)) => Some(binary_expr(left, *op, right)),
|
||||
(None, Some(node)) | (Some(node), None) => Some(node),
|
||||
(None, None) => None,
|
||||
});
|
||||
|
||||
Ok(VisitRecursion::Continue)
|
||||
}
|
||||
_ => Ok(VisitRecursion::Continue),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Rewrite the expression tree and return a boolean result.
|
||||
pub(in crate::plan) fn rewrite_conditional(expr: Expr, schemas: &Schemas) -> Result<Expr> {
|
||||
fn rewrite_conditional(expr: Expr, schemas: &Schemas) -> Result<Expr> {
|
||||
let expr = rewrite_expr(expr, schemas)?;
|
||||
Ok(match expr {
|
||||
Expr::Literal(ScalarValue::Null) => lit(false),
|
||||
|
@ -153,7 +514,7 @@ fn no(expr: Expr) -> Result<Transformed<Expr>> {
|
|||
///
|
||||
/// Rewrite and coerce the expression tree to model the behavior
|
||||
/// of an InfluxQL query.
|
||||
pub(in crate::plan) fn rewrite_expr(expr: Expr, schemas: &Schemas) -> Result<Expr> {
|
||||
fn rewrite_expr(expr: Expr, schemas: &Schemas) -> Result<Expr> {
|
||||
expr.transform(&|expr| {
|
||||
match expr {
|
||||
Expr::BinaryExpr(BinaryExpr {
|
||||
|
@ -392,11 +753,64 @@ fn rewrite_boolean(lhs: Expr, op: Operator, rhs: Expr) -> Expr {
|
|||
}
|
||||
}
|
||||
|
||||
/// Rewrite regex conditional expressions to match InfluxQL behaviour.
|
||||
struct FixRegularExpressions<'a> {
|
||||
schemas: &'a Schemas,
|
||||
}
|
||||
|
||||
impl<'a> TreeNodeRewriter for FixRegularExpressions<'a> {
|
||||
type N = Expr;
|
||||
|
||||
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
|
||||
match expr {
|
||||
// InfluxQL evaluates regular expression conditions to false if the column is numeric
|
||||
// or the column doesn't exist.
|
||||
Expr::BinaryExpr(BinaryExpr {
|
||||
left,
|
||||
op: op @ (Operator::RegexMatch | Operator::RegexNotMatch),
|
||||
right,
|
||||
}) => {
|
||||
if let Expr::Column(ref col) = *left {
|
||||
match self.schemas.iox_schema.field_by_name(&col.name) {
|
||||
Some((InfluxColumnType::Tag, _)) => {
|
||||
// Regular expressions expect to be compared with a Utf8
|
||||
let left =
|
||||
Box::new(left.cast_to(&DataType::Utf8, &self.schemas.df_schema)?);
|
||||
Ok(Expr::BinaryExpr(BinaryExpr { left, op, right }))
|
||||
}
|
||||
Some((InfluxColumnType::Field(InfluxFieldType::String), _)) => {
|
||||
Ok(Expr::BinaryExpr(BinaryExpr { left, op, right }))
|
||||
}
|
||||
// Any other column type should evaluate to false
|
||||
_ => Ok(lit(false)),
|
||||
}
|
||||
} else {
|
||||
// If this is not a simple column expression, evaluate to false,
|
||||
// to be consistent with InfluxQL.
|
||||
//
|
||||
// References:
|
||||
//
|
||||
// * https://github.com/influxdata/influxdb/blob/9308b6586a44e5999180f64a96cfb91e372f04dd/tsdb/index.go#L2487-L2488
|
||||
// * https://github.com/influxdata/influxdb/blob/9308b6586a44e5999180f64a96cfb91e372f04dd/tsdb/index.go#L2509-L2510
|
||||
//
|
||||
// The query engine does not correctly evaluate tag keys and values, always evaluating to false.
|
||||
//
|
||||
// Reference example:
|
||||
//
|
||||
// * `SELECT f64 FROM m0 WHERE tag0 = '' + tag0`
|
||||
Ok(lit(false))
|
||||
}
|
||||
}
|
||||
_ => Ok(expr),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use datafusion::common::{DFSchemaRef, ToDFSchema};
|
||||
use datafusion::logical_expr::lit_timestamp_nano;
|
||||
use datafusion::logical_expr::{lit_timestamp_nano, now};
|
||||
use datafusion_util::AsExpr;
|
||||
use schema::{InfluxFieldType, SchemaBuilder};
|
||||
use std::sync::Arc;
|
||||
|
@ -421,6 +835,123 @@ mod test {
|
|||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rewrite_time_range_exprs() {
|
||||
use datafusion::common::ScalarValue as V;
|
||||
|
||||
let rewrite = |expr| rewrite_time_range_exprs(expr).unwrap().to_string();
|
||||
|
||||
// does not need a rewrite
|
||||
let expr = "time"
|
||||
.as_expr()
|
||||
.gt_eq(now() - lit(V::new_interval_dt(0, 1000)));
|
||||
|
||||
assert_eq!(rewrite(expr), r#"time >= now() - IntervalDayTime("1000")"#);
|
||||
|
||||
// does not need a rewrite
|
||||
let expr = "time"
|
||||
.as_expr()
|
||||
.gt_eq(now() - lit(V::new_interval_dt(0, 1000)))
|
||||
.and("time".as_expr().lt(now()));
|
||||
assert_eq!(
|
||||
rewrite(expr),
|
||||
r#"time >= now() - IntervalDayTime("1000") AND time < now()"#
|
||||
);
|
||||
|
||||
// does not need a rewrite
|
||||
let expr = "time"
|
||||
.as_expr()
|
||||
.gt_eq(now() - lit(V::new_interval_dt(0, 1000)))
|
||||
.and("cpu".as_expr().eq(lit("cpu0")));
|
||||
assert_eq!(
|
||||
rewrite(expr),
|
||||
r#"time >= now() - IntervalDayTime("1000") AND cpu = Utf8("cpu0")"#
|
||||
);
|
||||
|
||||
// does not need a rewrite
|
||||
let expr = "time".as_expr().eq(lit_timestamp_nano(0));
|
||||
assert_eq!(rewrite(expr), r#"time = TimestampNanosecond(0, None)"#);
|
||||
|
||||
// The following expressions require rewrites to promote the time range to the top
|
||||
// of the expression tree
|
||||
|
||||
// instance = 'instance-01' OR instance = 'instance-02' AND time >= now() - 60s
|
||||
let expr = "instance"
|
||||
.as_expr()
|
||||
.eq(lit("instance-01"))
|
||||
.or("instance".as_expr().eq(lit("instance-02")))
|
||||
.and(
|
||||
"time"
|
||||
.as_expr()
|
||||
.gt_eq(now() - lit(V::new_interval_dt(0, 60_000))),
|
||||
);
|
||||
assert_eq!(
|
||||
rewrite(expr),
|
||||
r#"time >= now() - IntervalDayTime("60000") AND (instance = Utf8("instance-01") OR instance = Utf8("instance-02"))"#
|
||||
);
|
||||
|
||||
// time >= now - 60s AND time < now() AND cpu = 'cpu0' OR cpu = 'cpu1'
|
||||
let expr = "time"
|
||||
.as_expr()
|
||||
.gt_eq(now() - lit(V::new_interval_dt(0, 60_000)))
|
||||
.and("time".as_expr().lt(now()))
|
||||
.and(
|
||||
"cpu"
|
||||
.as_expr()
|
||||
.eq(lit("cpu0"))
|
||||
.or("cpu".as_expr().eq(lit("cpu1"))),
|
||||
);
|
||||
assert_eq!(
|
||||
rewrite(expr),
|
||||
r#"time >= now() - IntervalDayTime("60000") AND time < now() AND (cpu = Utf8("cpu0") OR cpu = Utf8("cpu1"))"#
|
||||
);
|
||||
|
||||
// time >= now - 60s AND time < now() OR cpu = 'cpu0' OR cpu = 'cpu1'
|
||||
//
|
||||
// Expects the time range to be combined with a conjunction (AND)
|
||||
let expr = "time"
|
||||
.as_expr()
|
||||
.gt_eq(now() - lit(V::new_interval_dt(0, 60_000)))
|
||||
.and("time".as_expr().lt(now()))
|
||||
.or("cpu"
|
||||
.as_expr()
|
||||
.eq(lit("cpu0"))
|
||||
.or("cpu".as_expr().eq(lit("cpu1"))));
|
||||
assert_eq!(
|
||||
rewrite(expr),
|
||||
r#"time >= now() - IntervalDayTime("60000") AND time < now() AND (cpu = Utf8("cpu0") OR cpu = Utf8("cpu1"))"#
|
||||
);
|
||||
|
||||
// time = 0 OR time = 10 AND cpu = 'cpu0'
|
||||
let expr = "time".as_expr().eq(lit_timestamp_nano(0)).or("time"
|
||||
.as_expr()
|
||||
.eq(lit_timestamp_nano(10))
|
||||
.and("cpu".as_expr().eq(lit("cpu0"))));
|
||||
assert_eq!(
|
||||
rewrite(expr),
|
||||
r#"(time = TimestampNanosecond(0, None) OR time = TimestampNanosecond(10, None)) AND cpu = Utf8("cpu0")"#
|
||||
);
|
||||
|
||||
// no time
|
||||
let expr = "f64".as_expr().gt_eq(lit(19.5_f64)).or(binary_expr(
|
||||
"f64".as_expr(),
|
||||
Operator::RegexMatch,
|
||||
lit("foo"),
|
||||
));
|
||||
assert_eq!(
|
||||
rewrite(expr),
|
||||
"f64 >= Float64(19.5) OR (f64 ~ Utf8(\"foo\"))"
|
||||
);
|
||||
|
||||
// fallible
|
||||
|
||||
let expr = "time"
|
||||
.as_expr()
|
||||
.eq(lit_timestamp_nano(0))
|
||||
.or("time".as_expr().gt(now()));
|
||||
assert_eq!(rewrite(expr), "Boolean(false)");
|
||||
}
|
||||
|
||||
/// Tests which validate that division is coalesced to `0`, to handle division by zero,
|
||||
/// which normally returns a `NULL`, but presents as `0` for InfluxQL.
|
||||
///
|
||||
|
|
Loading…
Reference in New Issue