refactor: rewrite time-range expressions to a single range
Fixes gap filling, which was confused by multiple lower or upper time bounds.pull/24376/head
parent
2a07b53879
commit
600ed6652c
|
@ -366,6 +366,13 @@ SELECT MEAN(usage_idle), MEAN(bytes_free) FROM cpu, disk WHERE time >= '2022-10-
|
|||
SELECT MEAN(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time <= '2022-10-31T02:00:10Z' GROUP BY TIME(2s) FILL(linear) LIMIT 3 OFFSET 1;
|
||||
SELECT MEAN(usage_idle), MEAN(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time <= '2022-10-31T02:00:10Z' GROUP BY TIME(2s) FILL(linear) LIMIT 3 OFFSET 1;
|
||||
|
||||
-- determines correct time range when multiple lower bounds
|
||||
-- expected: pick narrowest lower bound
|
||||
SELECT MEAN(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:04Z' AND time >= '2022-10-31T02:00:00Z' AND time <= '2022-10-31T02:00:10Z' GROUP BY TIME(2s) FILL(0);
|
||||
-- determines correct time range when multiple upper bounds
|
||||
-- expected: pick narrowest upper bound
|
||||
SELECT MEAN(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:00:10Z' AND time <= '2022-10-31T02:00:10Z' GROUP BY TIME(2s) FILL(0);
|
||||
|
||||
-- interpolates NULLs if there is no subsequent value
|
||||
SELECT MEAN(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:00:10Z' GROUP BY TIME(2s) FILL(linear);
|
||||
|
||||
|
|
|
@ -1447,6 +1447,27 @@ name: disk
|
|||
| 2022-10-31T02:00:04 | | 2236.0 |
|
||||
| 2022-10-31T02:00:06 | | 2237.0 |
|
||||
+---------------------+------+--------+
|
||||
-- InfluxQL: SELECT MEAN(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:04Z' AND time >= '2022-10-31T02:00:00Z' AND time <= '2022-10-31T02:00:10Z' GROUP BY TIME(2s) FILL(0);
|
||||
name: cpu
|
||||
+---------------------+--------------------+
|
||||
| time | mean |
|
||||
+---------------------+--------------------+
|
||||
| 2022-10-31T02:00:04 | 0.0 |
|
||||
| 2022-10-31T02:00:06 | 0.0 |
|
||||
| 2022-10-31T02:00:08 | 0.0 |
|
||||
| 2022-10-31T02:00:10 | 1.9900000000000002 |
|
||||
+---------------------+--------------------+
|
||||
-- InfluxQL: SELECT MEAN(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:00:10Z' AND time <= '2022-10-31T02:00:10Z' GROUP BY TIME(2s) FILL(0);
|
||||
name: cpu
|
||||
+---------------------+--------------------+
|
||||
| time | mean |
|
||||
+---------------------+--------------------+
|
||||
| 2022-10-31T02:00:00 | 1.9799999999999998 |
|
||||
| 2022-10-31T02:00:02 | 0.0 |
|
||||
| 2022-10-31T02:00:04 | 0.0 |
|
||||
| 2022-10-31T02:00:06 | 0.0 |
|
||||
| 2022-10-31T02:00:08 | 0.0 |
|
||||
+---------------------+--------------------+
|
||||
-- InfluxQL: SELECT MEAN(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:00:10Z' GROUP BY TIME(2s) FILL(linear);
|
||||
name: cpu
|
||||
+---------------------+--------------------+
|
||||
|
|
|
@ -3231,7 +3231,7 @@ mod test {
|
|||
plan("SELECT foo, f64_field FROM data where time > now() - 10s"), @r###"
|
||||
Sort: time ASC NULLS LAST, foo ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, f64_field:Float64;N]
|
||||
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, data.time AS time, data.foo AS foo, data.f64_field AS f64_field [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, f64_field:Float64;N]
|
||||
Filter: CAST(data.time AS Timestamp(Nanosecond, Some("+00:00"))) > TimestampNanosecond(1672531190000000000, Some("+00:00")) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
|
||||
Filter: data.time > TimestampNanosecond(1672531190000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
|
||||
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
|
||||
"###
|
||||
);
|
||||
|
@ -3252,7 +3252,7 @@ mod test {
|
|||
plan("SELECT foo, f64_field FROM data where now() - 10s < time"), @r###"
|
||||
Sort: time ASC NULLS LAST, foo ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, f64_field:Float64;N]
|
||||
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, data.time AS time, data.foo AS foo, data.f64_field AS f64_field [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, f64_field:Float64;N]
|
||||
Filter: TimestampNanosecond(1672531190000000000, Some("+00:00")) < CAST(data.time AS Timestamp(Nanosecond, Some("+00:00"))) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
|
||||
Filter: data.time > TimestampNanosecond(1672531190000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
|
||||
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
|
||||
"###
|
||||
);
|
||||
|
|
|
@ -121,6 +121,8 @@
|
|||
//! [`Reduce`]: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4850-L4852
|
||||
//! [`EvalBool`]: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4181-L4183
|
||||
//! [`Eval`]: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4137
|
||||
use std::cmp::Ordering;
|
||||
use std::ops::Bound;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::plan::error;
|
||||
|
@ -134,9 +136,10 @@ use datafusion::logical_expr::{
|
|||
binary_expr, cast, coalesce, lit, BinaryExpr, Expr, ExprSchemable, Operator,
|
||||
};
|
||||
use datafusion::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext};
|
||||
use datafusion::optimizer::utils::{conjunction, disjunction};
|
||||
use datafusion::optimizer::utils::disjunction;
|
||||
use datafusion::physical_expr::execution_props::ExecutionProps;
|
||||
use datafusion::prelude::when;
|
||||
use datafusion_util::AsExpr;
|
||||
use observability_deps::tracing::trace;
|
||||
use predicate::rpc_predicate::{iox_expr_rewrite, simplify_predicate};
|
||||
|
||||
|
@ -160,7 +163,7 @@ pub(super) fn rewrite_conditional_expr(
|
|||
.and_then(|expr| expr.rewrite(&mut FixRegularExpressions { schemas }))
|
||||
.map(|expr| log_rewrite(expr, "after fix_regular_expressions"))
|
||||
// rewrite time predicates to behave like InfluxQL OG
|
||||
.and_then(rewrite_time_range_exprs)
|
||||
.and_then(|expr| rewrite_time_range_exprs(expr, &simplifier))
|
||||
.map(|expr| log_rewrite(expr, "after rewrite_time_range_exprs"))
|
||||
// rewrite exprs with incompatible operands to NULL or FALSE
|
||||
// (seems like FixRegularExpressions could be combined into this pass)
|
||||
|
@ -177,7 +180,7 @@ pub(super) fn rewrite_conditional_expr(
|
|||
.map(|expr| log_rewrite(expr, "after iox_expr_rewrite"))
|
||||
// Coerce operand types to be compatible:
|
||||
// - convert numeric types so that operands agree
|
||||
// - convert Utf8 to Dictonary as needed
|
||||
// - convert Utf8 to Dictionary as needed
|
||||
// The next step will fail with type errors if we don't do this.
|
||||
.and_then(|expr| simplifier.coerce(expr, Arc::clone(&schemas.df_schema)))
|
||||
.map(|expr| log_rewrite(expr, "after coerce"))
|
||||
|
@ -386,33 +389,27 @@ pub(super) fn rewrite_field_expr(expr: Expr, schemas: &Schemas) -> Result<Expr>
|
|||
/// WHERE
|
||||
/// cpu = 'cpu0'
|
||||
/// ```
|
||||
fn rewrite_time_range_exprs(expr: Expr) -> Result<Expr> {
|
||||
let mut has_or = false;
|
||||
let mut has_time_range = false;
|
||||
expr.apply(&mut |expr| {
|
||||
match expr {
|
||||
Expr::BinaryExpr(BinaryExpr {
|
||||
op: Operator::Or, ..
|
||||
}) => has_or = true,
|
||||
expr @ Expr::BinaryExpr(_) if is_time_range(expr) => has_time_range = true,
|
||||
_ => return Ok(VisitRecursion::Continue),
|
||||
}
|
||||
fn rewrite_time_range_exprs(
|
||||
expr: Expr,
|
||||
simplifier: &ExprSimplifier<SimplifyContext<'_>>,
|
||||
) -> Result<Expr> {
|
||||
let has_time_range = matches!(
|
||||
expr.apply(&mut |expr| Ok(match expr {
|
||||
expr @ Expr::BinaryExpr(_) if is_time_range(expr) => {
|
||||
VisitRecursion::Stop
|
||||
}
|
||||
_ => VisitRecursion::Continue,
|
||||
}))
|
||||
.expect("infallible"),
|
||||
VisitRecursion::Stop
|
||||
);
|
||||
|
||||
Ok(if has_or && has_time_range {
|
||||
// no need to continue if we've found both
|
||||
VisitRecursion::Stop
|
||||
} else {
|
||||
VisitRecursion::Continue
|
||||
})
|
||||
})?;
|
||||
|
||||
// if there is no time range expressions or there are no OR operators,
|
||||
// we don't need to rewrite the expression
|
||||
if !has_time_range || !has_or {
|
||||
// Nothing to do if there are no time range expressions
|
||||
if !has_time_range {
|
||||
return Ok(expr);
|
||||
}
|
||||
|
||||
let mut rw = SeparateTimeRanges::default();
|
||||
let mut rw = SeparateTimeRange::new(simplifier);
|
||||
expr.visit(&mut rw)?;
|
||||
|
||||
// When `expr` contains both time expressions using relational
|
||||
|
@ -421,12 +418,12 @@ fn rewrite_time_range_exprs(expr: Expr) -> Result<Expr> {
|
|||
// WHERE time > now() - 5s OR time = '2004-04-09:12:00:00Z' AND cpu = 'cpu0'
|
||||
//
|
||||
// the entire expression evaluates to `false` to be consistent with InfluxQL.
|
||||
if !rw.relational.is_empty() && !rw.equality.is_empty() {
|
||||
if !rw.time_range.is_unbounded() && !rw.equality.is_empty() {
|
||||
return Ok(lit(false));
|
||||
}
|
||||
|
||||
let lhs = if !rw.relational.is_empty() {
|
||||
conjunction(rw.relational)
|
||||
let lhs = if let Some(expr) = rw.time_range.as_expr() {
|
||||
Some(expr)
|
||||
} else if !rw.equality.is_empty() {
|
||||
disjunction(rw.equality)
|
||||
} else {
|
||||
|
@ -475,15 +472,205 @@ fn is_time_range(expr: &Expr) -> bool {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct SeparateTimeRanges {
|
||||
/// Represents the lower bound, in nanoseconds, of a [`TimeRange`].
|
||||
struct LowerBound(Bound<i64>);
|
||||
|
||||
impl LowerBound {
|
||||
/// Create a new, time bound that is unbounded
|
||||
fn unbounded() -> Self {
|
||||
Self(Bound::Unbounded)
|
||||
}
|
||||
|
||||
/// Create a new, time bound that includes `v`
|
||||
fn included(v: i64) -> Self {
|
||||
Self(Bound::Included(v))
|
||||
}
|
||||
|
||||
/// Create a new, time bound that excludes `v`
|
||||
fn excluded(v: i64) -> Self {
|
||||
Self(Bound::Excluded(v))
|
||||
}
|
||||
|
||||
/// Returns `true` if the receiver is unbounded.
|
||||
fn is_unbounded(&self) -> bool {
|
||||
matches!(self.0, Bound::Unbounded)
|
||||
}
|
||||
|
||||
fn as_expr(&self) -> Option<Expr> {
|
||||
match self.0 {
|
||||
Bound::Included(ts) => Some(
|
||||
"time"
|
||||
.as_expr()
|
||||
.gt_eq(lit(ScalarValue::TimestampNanosecond(Some(ts), None))),
|
||||
),
|
||||
Bound::Excluded(ts) => Some(
|
||||
"time"
|
||||
.as_expr()
|
||||
.gt(lit(ScalarValue::TimestampNanosecond(Some(ts), None))),
|
||||
),
|
||||
Bound::Unbounded => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for LowerBound {
|
||||
fn default() -> Self {
|
||||
Self::unbounded()
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for LowerBound {}
|
||||
|
||||
impl PartialEq<Self> for LowerBound {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.cmp(other) == Ordering::Equal
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd<Self> for LowerBound {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for LowerBound {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
match (self.0, other.0) {
|
||||
(Bound::Unbounded, Bound::Unbounded) => Ordering::Equal,
|
||||
(Bound::Unbounded, _) => Ordering::Less,
|
||||
(_, Bound::Unbounded) => Ordering::Greater,
|
||||
(Bound::Included(a), Bound::Included(b)) | (Bound::Excluded(a), Bound::Excluded(b)) => {
|
||||
a.cmp(&b)
|
||||
}
|
||||
(Bound::Included(a), Bound::Excluded(b)) => match a.cmp(&b) {
|
||||
Ordering::Equal => Ordering::Less,
|
||||
// We know that if a > b, b + 1 is safe from overflow
|
||||
Ordering::Greater if a == b + 1 => Ordering::Equal,
|
||||
ordering => ordering,
|
||||
},
|
||||
(Bound::Excluded(a), Bound::Included(b)) => match a.cmp(&b) {
|
||||
Ordering::Equal => Ordering::Greater,
|
||||
// We know that if a < b, a + 1 is safe from overflow
|
||||
Ordering::Less if a + 1 == b => Ordering::Equal,
|
||||
ordering => ordering,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents the upper bound, in nanoseconds, of a [`TimeRange`].
|
||||
struct UpperBound(Bound<i64>);
|
||||
|
||||
impl UpperBound {
|
||||
/// Create a new, unbounded upper bound.
|
||||
fn unbounded() -> Self {
|
||||
Self(Bound::Unbounded)
|
||||
}
|
||||
|
||||
/// Create a new, upper bound that includes `v`
|
||||
fn included(v: i64) -> Self {
|
||||
Self(Bound::Included(v))
|
||||
}
|
||||
|
||||
/// Create a new, upper bound that excludes `v`
|
||||
fn excluded(v: i64) -> Self {
|
||||
Self(Bound::Excluded(v))
|
||||
}
|
||||
|
||||
/// Returns `true` if the receiver is unbounded.
|
||||
fn is_unbounded(&self) -> bool {
|
||||
matches!(self.0, Bound::Unbounded)
|
||||
}
|
||||
|
||||
fn as_expr(&self) -> Option<Expr> {
|
||||
match self.0 {
|
||||
Bound::Included(ts) => Some(
|
||||
"time"
|
||||
.as_expr()
|
||||
.lt_eq(lit(ScalarValue::TimestampNanosecond(Some(ts), None))),
|
||||
),
|
||||
Bound::Excluded(ts) => Some(
|
||||
"time"
|
||||
.as_expr()
|
||||
.lt(lit(ScalarValue::TimestampNanosecond(Some(ts), None))),
|
||||
),
|
||||
Bound::Unbounded => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for UpperBound {
|
||||
fn default() -> Self {
|
||||
Self::unbounded()
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for UpperBound {}
|
||||
|
||||
impl PartialEq<Self> for UpperBound {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.cmp(other) == Ordering::Equal
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd<Self> for UpperBound {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for UpperBound {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
match (self.0, other.0) {
|
||||
(Bound::Unbounded, Bound::Unbounded) => Ordering::Equal,
|
||||
(Bound::Unbounded, _) => Ordering::Greater,
|
||||
(_, Bound::Unbounded) => Ordering::Less,
|
||||
(Bound::Included(a), Bound::Included(b)) | (Bound::Excluded(a), Bound::Excluded(b)) => {
|
||||
a.cmp(&b)
|
||||
}
|
||||
(Bound::Included(a), Bound::Excluded(b)) => match a.cmp(&b) {
|
||||
Ordering::Equal => Ordering::Greater,
|
||||
// We know that if a < b, b - 1 is safe from underflow
|
||||
Ordering::Less if a == b - 1 => Ordering::Equal,
|
||||
ordering => ordering,
|
||||
},
|
||||
(Bound::Excluded(a), Bound::Included(b)) => match a.cmp(&b) {
|
||||
Ordering::Equal => Ordering::Less,
|
||||
// We know that if a > b, a - 1 is safe from overflow
|
||||
Ordering::Greater if a - 1 == b => Ordering::Equal,
|
||||
ordering => ordering,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents a time range, with a single lower and upper bound.
|
||||
#[derive(Default, PartialEq, Eq)]
|
||||
struct TimeRange {
|
||||
lower: LowerBound,
|
||||
upper: UpperBound,
|
||||
}
|
||||
|
||||
impl TimeRange {
|
||||
/// Returns `true` if the time range is unbounded.
|
||||
fn is_unbounded(&self) -> bool {
|
||||
self.lower.is_unbounded() && self.upper.is_unbounded()
|
||||
}
|
||||
|
||||
/// Returns the time range as a conditional expression.
|
||||
fn as_expr(&self) -> Option<Expr> {
|
||||
match (self.lower.as_expr(), self.upper.as_expr()) {
|
||||
(None, None) => None,
|
||||
(Some(e), None) | (None, Some(e)) => Some(e),
|
||||
(Some(lower), Some(upper)) => Some(lower.and(upper)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct SeparateTimeRange<'a> {
|
||||
simplifier: &'a ExprSimplifier<SimplifyContext<'a>>,
|
||||
stack: Vec<Option<Expr>>,
|
||||
/// Relational time range expressions, such as:
|
||||
///
|
||||
/// ```sql
|
||||
/// time >= now()
|
||||
/// ```
|
||||
relational: Vec<Expr>,
|
||||
time_range: TimeRange,
|
||||
|
||||
/// Equality time range expressions, such as:
|
||||
///
|
||||
|
@ -492,7 +679,18 @@ struct SeparateTimeRanges {
|
|||
equality: Vec<Expr>,
|
||||
}
|
||||
|
||||
impl TreeNodeVisitor for SeparateTimeRanges {
|
||||
impl<'a> SeparateTimeRange<'a> {
|
||||
fn new(simplifier: &'a ExprSimplifier<SimplifyContext<'_>>) -> Self {
|
||||
Self {
|
||||
simplifier,
|
||||
stack: vec![],
|
||||
time_range: Default::default(),
|
||||
equality: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> TreeNodeVisitor for SeparateTimeRange<'a> {
|
||||
type N = Expr;
|
||||
|
||||
fn pre_visit(&mut self, node: &Self::N) -> Result<VisitRecursion> {
|
||||
|
@ -507,19 +705,96 @@ impl TreeNodeVisitor for SeparateTimeRanges {
|
|||
use Operator::*;
|
||||
|
||||
match node {
|
||||
node @ Expr::BinaryExpr(_) if is_time_range(node) => {
|
||||
// A binary expression where either the left or right
|
||||
// expression refers to the "time" column.
|
||||
//
|
||||
// "time" OP expression
|
||||
// expression OP "time"
|
||||
//
|
||||
Expr::BinaryExpr(BinaryExpr {
|
||||
left,
|
||||
op: op @ (Eq | NotEq | Gt | Lt | GtEq | LtEq),
|
||||
right,
|
||||
}) if is_time_column(left) | is_time_column(right) => {
|
||||
self.stack.push(None);
|
||||
// separate equality from relational time-range expressions
|
||||
if matches!(
|
||||
node,
|
||||
Expr::BinaryExpr(BinaryExpr {
|
||||
op: Gt | GtEq | Lt | LtEq,
|
||||
..
|
||||
})
|
||||
) {
|
||||
self.relational.push(node.clone())
|
||||
|
||||
if matches!(op, Eq | NotEq) {
|
||||
let node = self.simplifier.simplify(node.clone())?;
|
||||
self.equality.push(node);
|
||||
return Ok(VisitRecursion::Continue);
|
||||
}
|
||||
|
||||
/// Op is the limited set of operators expected from here on,
|
||||
/// to avoid repeated wildcard match arms with unreachable!().
|
||||
enum Op {
|
||||
Gt,
|
||||
GtEq,
|
||||
Lt,
|
||||
LtEq,
|
||||
}
|
||||
|
||||
// Map the DataFusion Operator to Op
|
||||
let op = match op {
|
||||
Gt => Op::Gt,
|
||||
GtEq => Op::GtEq,
|
||||
Lt => Op::Lt,
|
||||
LtEq => Op::LtEq,
|
||||
_ => unreachable!("expected: Gt | Lt | GtEq | LtEq"),
|
||||
};
|
||||
|
||||
let (expr, op) = if is_time_column(left) {
|
||||
(right, op)
|
||||
} else {
|
||||
self.equality.push(node.clone())
|
||||
// swap the operators when the conditional is `expression OP "time"`
|
||||
(
|
||||
left,
|
||||
match op {
|
||||
Op::Gt => Op::Lt,
|
||||
Op::GtEq => Op::LtEq,
|
||||
Op::Lt => Op::Gt,
|
||||
Op::LtEq => Op::GtEq,
|
||||
},
|
||||
)
|
||||
};
|
||||
|
||||
// resolve `now()` and reduce binary expressions to a single constant
|
||||
let expr = self.simplifier.simplify(*expr.clone())?;
|
||||
|
||||
let ts = match expr {
|
||||
Expr::Literal(ScalarValue::TimestampNanosecond(Some(ts), _)) => ts,
|
||||
expr => {
|
||||
return error::internal(format!(
|
||||
"expected TimestampNanosecond, got: {}",
|
||||
expr
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
match op {
|
||||
Op::Gt => {
|
||||
let ts = LowerBound::excluded(ts);
|
||||
if ts > self.time_range.lower {
|
||||
self.time_range.lower = ts;
|
||||
}
|
||||
}
|
||||
Op::GtEq => {
|
||||
let ts = LowerBound::included(ts);
|
||||
if ts > self.time_range.lower {
|
||||
self.time_range.lower = ts;
|
||||
}
|
||||
}
|
||||
Op::Lt => {
|
||||
let ts = UpperBound::excluded(ts);
|
||||
if ts < self.time_range.upper {
|
||||
self.time_range.upper = ts;
|
||||
}
|
||||
}
|
||||
Op::LtEq => {
|
||||
let ts = UpperBound::included(ts);
|
||||
if ts < self.time_range.upper {
|
||||
self.time_range.upper = ts;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(VisitRecursion::Continue)
|
||||
|
@ -910,43 +1185,85 @@ mod test {
|
|||
fn test_rewrite_time_range_exprs() {
|
||||
use datafusion::common::ScalarValue as V;
|
||||
|
||||
let rewrite = |expr| rewrite_time_range_exprs(expr).unwrap().to_string();
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
let props = execution_props();
|
||||
let (schemas, _) = new_schemas();
|
||||
let simplify_context =
|
||||
SimplifyContext::new(&props).with_schema(Arc::clone(&schemas.df_schema));
|
||||
let simplifier = ExprSimplifier::new(simplify_context);
|
||||
|
||||
let rewrite = |expr| {
|
||||
rewrite_time_range_exprs(expr, &simplifier)
|
||||
.unwrap()
|
||||
.to_string()
|
||||
};
|
||||
|
||||
// does not need a rewrite
|
||||
let expr = "time"
|
||||
.as_expr()
|
||||
.gt_eq(now() - lit(V::new_interval_dt(0, 1000)));
|
||||
assert_eq!(
|
||||
rewrite(expr),
|
||||
r#"time >= TimestampNanosecond(1672531199000000000, None)"#
|
||||
);
|
||||
|
||||
assert_eq!(rewrite(expr), r#"time >= now() - IntervalDayTime("1000")"#);
|
||||
// reduces the lower bound to a single expression
|
||||
let expr = "time"
|
||||
.as_expr()
|
||||
.gt_eq(now() - lit(V::new_interval_dt(0, 1000)))
|
||||
.and(
|
||||
"time"
|
||||
.as_expr()
|
||||
.gt_eq(now() - lit(V::new_interval_dt(0, 500))),
|
||||
);
|
||||
assert_eq!(
|
||||
rewrite(expr),
|
||||
r#"time >= TimestampNanosecond(1672531199500000000, None)"#
|
||||
);
|
||||
|
||||
let expr = "time"
|
||||
.as_expr()
|
||||
.lt_eq(now() - lit(V::new_interval_dt(0, 1000)));
|
||||
assert_eq!(
|
||||
rewrite(expr),
|
||||
r#"time <= TimestampNanosecond(1672531199000000000, None)"#
|
||||
);
|
||||
|
||||
// reduces the upper bound to a single expression
|
||||
let expr = "time"
|
||||
.as_expr()
|
||||
.lt_eq(now() + lit(V::new_interval_dt(0, 1000)))
|
||||
.and(
|
||||
"time"
|
||||
.as_expr()
|
||||
.lt_eq(now() + lit(V::new_interval_dt(0, 500))),
|
||||
);
|
||||
assert_eq!(
|
||||
rewrite(expr),
|
||||
r#"time <= TimestampNanosecond(1672531200500000000, None)"#
|
||||
);
|
||||
|
||||
// does not need a rewrite
|
||||
let expr = "time"
|
||||
.as_expr()
|
||||
.gt_eq(now() - lit(V::new_interval_dt(0, 1000)))
|
||||
.and("time".as_expr().lt(now()));
|
||||
assert_eq!(
|
||||
rewrite(expr),
|
||||
r#"time >= now() - IntervalDayTime("1000") AND time < now()"#
|
||||
r#"time >= TimestampNanosecond(1672531199000000000, None) AND time < TimestampNanosecond(1672531200000000000, None)"#
|
||||
);
|
||||
|
||||
// does not need a rewrite
|
||||
let expr = "time"
|
||||
.as_expr()
|
||||
.gt_eq(now() - lit(V::new_interval_dt(0, 1000)))
|
||||
.and("cpu".as_expr().eq(lit("cpu0")));
|
||||
assert_eq!(
|
||||
rewrite(expr),
|
||||
r#"time >= now() - IntervalDayTime("1000") AND cpu = Utf8("cpu0")"#
|
||||
r#"time >= TimestampNanosecond(1672531199000000000, None) AND cpu = Utf8("cpu0")"#
|
||||
);
|
||||
|
||||
// does not need a rewrite
|
||||
let expr = "time".as_expr().eq(lit_timestamp_nano(0));
|
||||
assert_eq!(rewrite(expr), r#"time = TimestampNanosecond(0, None)"#);
|
||||
|
||||
// The following expressions require rewrites to promote the time range to the top
|
||||
// of the expression tree
|
||||
|
||||
// instance = 'instance-01' OR instance = 'instance-02' AND time >= now() - 60s
|
||||
let expr = "instance"
|
||||
.as_expr()
|
||||
.eq(lit("instance-01"))
|
||||
|
@ -958,10 +1275,9 @@ mod test {
|
|||
);
|
||||
assert_eq!(
|
||||
rewrite(expr),
|
||||
r#"time >= now() - IntervalDayTime("60000") AND (instance = Utf8("instance-01") OR instance = Utf8("instance-02"))"#
|
||||
r#"time >= TimestampNanosecond(1672531140000000000, None) AND (instance = Utf8("instance-01") OR instance = Utf8("instance-02"))"#
|
||||
);
|
||||
|
||||
// time >= now - 60s AND time < now() AND cpu = 'cpu0' OR cpu = 'cpu1'
|
||||
let expr = "time"
|
||||
.as_expr()
|
||||
.gt_eq(now() - lit(V::new_interval_dt(0, 60_000)))
|
||||
|
@ -974,7 +1290,7 @@ mod test {
|
|||
);
|
||||
assert_eq!(
|
||||
rewrite(expr),
|
||||
r#"time >= now() - IntervalDayTime("60000") AND time < now() AND (cpu = Utf8("cpu0") OR cpu = Utf8("cpu1"))"#
|
||||
r#"time >= TimestampNanosecond(1672531140000000000, None) AND time < TimestampNanosecond(1672531200000000000, None) AND (cpu = Utf8("cpu0") OR cpu = Utf8("cpu1"))"#
|
||||
);
|
||||
|
||||
// time >= now - 60s AND time < now() OR cpu = 'cpu0' OR cpu = 'cpu1'
|
||||
|
@ -990,7 +1306,7 @@ mod test {
|
|||
.or("cpu".as_expr().eq(lit("cpu1"))));
|
||||
assert_eq!(
|
||||
rewrite(expr),
|
||||
r#"time >= now() - IntervalDayTime("60000") AND time < now() AND (cpu = Utf8("cpu0") OR cpu = Utf8("cpu1"))"#
|
||||
r#"time >= TimestampNanosecond(1672531140000000000, None) AND time < TimestampNanosecond(1672531200000000000, None) AND (cpu = Utf8("cpu0") OR cpu = Utf8("cpu1"))"#
|
||||
);
|
||||
|
||||
// time = 0 OR time = 10 AND cpu = 'cpu0'
|
||||
|
@ -1417,4 +1733,119 @@ mod test {
|
|||
let expr = col("string_field").not_eq(lit("foo"));
|
||||
assert_eq!(rewrite(expr), r#"string_field != Utf8("foo")"#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_lower_bound_cmp() {
|
||||
let (a, b) = (LowerBound::unbounded(), LowerBound::unbounded());
|
||||
assert!(a == b);
|
||||
|
||||
let (a, b) = (LowerBound::included(5), LowerBound::included(5));
|
||||
assert!(a == b);
|
||||
|
||||
let (a, b) = (LowerBound::included(5), LowerBound::included(6));
|
||||
assert!(a < b);
|
||||
|
||||
// a >= 6 gt a >= 5
|
||||
let (a, b) = (LowerBound::included(6), LowerBound::included(5));
|
||||
assert!(a > b);
|
||||
|
||||
let (a, b) = (LowerBound::excluded(5), LowerBound::excluded(5));
|
||||
assert!(a == b);
|
||||
|
||||
let (a, b) = (LowerBound::excluded(5), LowerBound::excluded(6));
|
||||
assert!(a < b);
|
||||
|
||||
let (a, b) = (LowerBound::excluded(6), LowerBound::excluded(5));
|
||||
assert!(a > b);
|
||||
|
||||
let (a, b) = (LowerBound::unbounded(), LowerBound::included(5));
|
||||
assert!(a < b);
|
||||
|
||||
let (a, b) = (LowerBound::unbounded(), LowerBound::excluded(5));
|
||||
assert!(a < b);
|
||||
|
||||
let (a, b) = (LowerBound::included(5), LowerBound::unbounded());
|
||||
assert!(a > b);
|
||||
|
||||
let (a, b) = (LowerBound::excluded(5), LowerBound::unbounded());
|
||||
assert!(a > b);
|
||||
|
||||
let (a, b) = (LowerBound::included(5), LowerBound::excluded(5));
|
||||
assert!(a < b);
|
||||
|
||||
let (a, b) = (LowerBound::included(5), LowerBound::excluded(4));
|
||||
assert!(a == b);
|
||||
|
||||
let (a, b) = (LowerBound::included(5), LowerBound::excluded(6));
|
||||
assert!(a < b);
|
||||
|
||||
let (a, b) = (LowerBound::included(6), LowerBound::excluded(5));
|
||||
assert!(a == b);
|
||||
|
||||
let (a, b) = (LowerBound::excluded(5), LowerBound::included(5));
|
||||
assert!(a > b);
|
||||
|
||||
let (a, b) = (LowerBound::excluded(5), LowerBound::included(6));
|
||||
assert!(a == b);
|
||||
|
||||
let (a, b) = (LowerBound::excluded(6), LowerBound::included(5));
|
||||
assert!(a > b);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_upper_bound_cmp() {
|
||||
let (a, b) = (UpperBound::unbounded(), UpperBound::unbounded());
|
||||
assert!(a == b);
|
||||
|
||||
let (a, b) = (UpperBound::included(5), UpperBound::included(5));
|
||||
assert!(a == b);
|
||||
|
||||
let (a, b) = (UpperBound::included(5), UpperBound::included(6));
|
||||
assert!(a < b);
|
||||
|
||||
let (a, b) = (UpperBound::included(6), UpperBound::included(5));
|
||||
assert!(a > b);
|
||||
|
||||
let (a, b) = (UpperBound::excluded(5), UpperBound::excluded(5));
|
||||
assert!(a == b);
|
||||
|
||||
let (a, b) = (UpperBound::excluded(5), UpperBound::excluded(6));
|
||||
assert!(a < b);
|
||||
|
||||
let (a, b) = (UpperBound::excluded(6), UpperBound::excluded(5));
|
||||
assert!(a > b);
|
||||
|
||||
let (a, b) = (UpperBound::unbounded(), UpperBound::included(5));
|
||||
assert!(a > b);
|
||||
|
||||
let (a, b) = (UpperBound::unbounded(), UpperBound::excluded(5));
|
||||
assert!(a > b);
|
||||
|
||||
let (a, b) = (UpperBound::included(5), UpperBound::unbounded());
|
||||
assert!(a < b);
|
||||
|
||||
let (a, b) = (UpperBound::excluded(5), UpperBound::unbounded());
|
||||
assert!(a < b);
|
||||
|
||||
let (a, b) = (UpperBound::included(5), UpperBound::excluded(5));
|
||||
assert!(a > b);
|
||||
|
||||
let (a, b) = (UpperBound::included(5), UpperBound::excluded(4));
|
||||
assert!(a > b);
|
||||
|
||||
let (a, b) = (UpperBound::included(5), UpperBound::excluded(6));
|
||||
assert!(a == b);
|
||||
|
||||
let (a, b) = (UpperBound::included(5), UpperBound::excluded(7));
|
||||
assert!(a < b);
|
||||
|
||||
let (a, b) = (UpperBound::excluded(5), UpperBound::included(5));
|
||||
assert!(a < b);
|
||||
|
||||
let (a, b) = (UpperBound::excluded(5), UpperBound::included(6));
|
||||
assert!(a < b);
|
||||
|
||||
let (a, b) = (UpperBound::excluded(5), UpperBound::included(4));
|
||||
assert!(a == b);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue