From d5719f9be290aefd81e3dd059bd14517af6dcc60 Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Fri, 2 Jun 2023 09:50:01 +1000 Subject: [PATCH] refactor: Moved simplification of time range expressions to parser --- Cargo.lock | 1 + influxdb_influxql_parser/Cargo.toml | 1 + .../src/expression/arithmetic.rs | 50 +- influxdb_influxql_parser/src/lib.rs | 4 +- influxdb_influxql_parser/src/literal.rs | 30 +- influxdb_influxql_parser/src/time_range.rs | 542 ++++++++++++++++ .../src}/timestamp.rs | 17 +- iox_query_influxql/Cargo.toml | 2 +- iox_query_influxql/src/plan/error.rs | 9 + .../plan/influxql_time_range_expression.rs | 582 +----------------- iox_query_influxql/src/plan/mod.rs | 1 - iox_query_influxql/src/plan/planner.rs | 33 +- .../src/plan/planner/time_range.rs | 407 ++++++------ 13 files changed, 905 insertions(+), 774 deletions(-) create mode 100644 influxdb_influxql_parser/src/time_range.rs rename {iox_query_influxql/src/plan => influxdb_influxql_parser/src}/timestamp.rs (95%) diff --git a/Cargo.lock b/Cargo.lock index 25429a9226..c901fd4d58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2490,6 +2490,7 @@ dependencies = [ "chrono-tz", "insta", "nom", + "num-integer", "num-traits", "once_cell", "paste", diff --git a/influxdb_influxql_parser/Cargo.toml b/influxdb_influxql_parser/Cargo.toml index a789ed7355..55bb07543a 100644 --- a/influxdb_influxql_parser/Cargo.toml +++ b/influxdb_influxql_parser/Cargo.toml @@ -10,6 +10,7 @@ nom = { version = "7", default-features = false, features = ["std"] } once_cell = "1" chrono = { version = "0.4", default-features = false, features = ["std"] } chrono-tz = { version = "0.8" } +num-integer = { version = "0.1", default-features = false, features = ["i128", "std"] } num-traits = "0.2" workspace-hack = { version = "0.1", path = "../workspace-hack" } diff --git a/influxdb_influxql_parser/src/expression/arithmetic.rs b/influxdb_influxql_parser/src/expression/arithmetic.rs index 5321f83108..59f9e2147a 100644 --- a/influxdb_influxql_parser/src/expression/arithmetic.rs +++ b/influxdb_influxql_parser/src/expression/arithmetic.rs @@ -2,7 +2,8 @@ use crate::common::ws0; use crate::identifier::unquoted_identifier; use crate::internal::{expect, Error, ParseError, ParseResult}; use crate::keywords::keyword; -use crate::literal::literal_regex; +use crate::literal::{literal_regex, Duration}; +use crate::timestamp::Timestamp; use crate::{ identifier::{identifier, Identifier}, literal::Literal, @@ -633,6 +634,53 @@ fn reduce_expr(expr: Expr, remainder: Vec<(BinaryOperator, Expr)>) -> Expr { }) } +/// Trait for converting a type to a [`Expr::Literal`] expression. +pub trait LiteralExpr { + /// Convert the receiver to a literal expression. + fn lit(self) -> Expr; +} + +/// Convert `v` to a literal expression. +pub fn lit(v: T) -> Expr { + v.lit() +} + +impl LiteralExpr for Literal { + fn lit(self) -> Expr { + Expr::Literal(self) + } +} + +impl LiteralExpr for Duration { + fn lit(self) -> Expr { + Expr::Literal(Literal::Duration(self)) + } +} + +impl LiteralExpr for i64 { + fn lit(self) -> Expr { + Expr::Literal(Literal::Integer(self)) + } +} + +impl LiteralExpr for f64 { + fn lit(self) -> Expr { + Expr::Literal(Literal::Float(self)) + } +} + +impl LiteralExpr for String { + fn lit(self) -> Expr { + Expr::Literal(Literal::String(self)) + } +} + +impl LiteralExpr for Timestamp { + fn lit(self) -> Expr { + Expr::Literal(Literal::Timestamp(self)) + } +} + #[cfg(test)] mod test { use super::*; diff --git a/influxdb_influxql_parser/src/lib.rs b/influxdb_influxql_parser/src/lib.rs index 29ef9aca73..33968836d5 100644 --- a/influxdb_influxql_parser/src/lib.rs +++ b/influxdb_influxql_parser/src/lib.rs @@ -17,7 +17,7 @@ )] // Workaround for "unused crate" lint false positives. -use workspace_hack as _; +// use workspace_hack as _; use crate::common::{statement_terminator, ws0}; use crate::internal::Error as InternalError; @@ -51,6 +51,8 @@ pub mod show_tag_values; pub mod simple_from_clause; pub mod statement; pub mod string; +pub mod time_range; +pub mod timestamp; pub mod visit; pub mod visit_mut; diff --git a/influxdb_influxql_parser/src/literal.rs b/influxdb_influxql_parser/src/literal.rs index c263c12b9f..67e19574c9 100644 --- a/influxdb_influxql_parser/src/literal.rs +++ b/influxdb_influxql_parser/src/literal.rs @@ -4,8 +4,9 @@ use crate::common::ws0; use crate::internal::{map_error, map_fail, ParseResult}; use crate::keywords::keyword; use crate::string::{regex, single_quoted_string, Regex}; +use crate::timestamp::Timestamp; use crate::{impl_tuple_clause, write_escaped}; -use chrono::{DateTime, FixedOffset}; +use chrono::{NaiveDateTime, Offset}; use nom::branch::alt; use nom::bytes::complete::tag; use nom::character::complete::{char, digit0, digit1}; @@ -55,7 +56,7 @@ pub enum Literal { Regex(Regex), /// A timestamp identified in a time range expression of a conditional expression. - Timestamp(DateTime), + Timestamp(Timestamp), } impl From for Literal { @@ -353,6 +354,17 @@ pub(crate) fn literal_regex(i: &str) -> ParseResult<&str, Literal> { map(regex, Literal::Regex)(i) } +/// Returns `nanos` as a timestamp. +pub fn nanos_to_timestamp(nanos: i64) -> Timestamp { + let (secs, nsec) = num_integer::div_mod_floor(nanos, NANOS_PER_SEC); + + Timestamp::from_utc( + NaiveDateTime::from_timestamp_opt(secs, nsec as u32) + .expect("unable to convert duration to timestamp"), + chrono::Utc.fix(), + ) +} + #[cfg(test)] mod test { use super::*; @@ -572,4 +584,18 @@ mod test { let (_, got) = number("+ 501").unwrap(); assert_matches!(got, Number::Integer(v) if v == 501); } + + #[test] + fn test_nanos_to_timestamp() { + let ts = nanos_to_timestamp(0); + assert_eq!(ts.to_rfc3339(), "1970-01-01T00:00:00+00:00"); + + // infallible + let ts = nanos_to_timestamp(i64::MAX); + assert_eq!(ts.timestamp_nanos(), i64::MAX); + + // let ts = nanos_to_timestamp(i64::MIN); + // This line panics with an arithmetic overflow. + // assert_eq!(ts.timestamp_nanos(), i64::MIN); + } } diff --git a/influxdb_influxql_parser/src/time_range.rs b/influxdb_influxql_parser/src/time_range.rs new file mode 100644 index 0000000000..91052b6482 --- /dev/null +++ b/influxdb_influxql_parser/src/time_range.rs @@ -0,0 +1,542 @@ +//! Process InfluxQL time range expressions +//! +use crate::expression::walk::{walk_expression, Expression}; +use crate::expression::{lit, Binary, BinaryOperator, ConditionalExpression, Expr, VarRef}; +use crate::functions::is_now_function; +use crate::literal::{nanos_to_timestamp, Duration, Literal}; +use crate::timestamp::{parse_timestamp, Timestamp}; +use std::ops::ControlFlow; + +/// Result type for operations that could result in an [`ExprError`]. +pub type ExprResult = Result; + +#[allow(dead_code)] +fn split_cond( + cond: &ConditionalExpression, +) -> (Option, Option) { + // search the tree for an expression involving `time`. + let no_time = walk_expression(cond, &mut |e| { + if let Expression::Conditional(cond) = e { + if is_time_field(cond) { + return ControlFlow::Break(()); + } + } + ControlFlow::Continue(()) + }) + .is_continue(); + + if no_time { + return (None, None); + } + + unimplemented!() +} + +/// Simplifies an InfluxQL duration `expr` to a nanosecond interval represented as an `i64`. +pub fn duration_expr_to_nanoseconds(expr: &Expr) -> Result { + let ctx = ReduceContext::default(); + match reduce_expr(&ctx, expr)? { + Expr::Literal(Literal::Duration(v)) => Ok(*v), + Expr::Literal(Literal::Float(v)) => Ok(v as i64), + Expr::Literal(Literal::Integer(v)) => Ok(v), + _ => error::expr("invalid duration expression"), + } +} + +/// Represents an error that occurred whilst simplifying an InfluxQL expression. +#[derive(Debug)] +pub enum ExprError { + /// An error in the expression that can be resolved by the client. + Expression(String), + + /// An internal error that signals a bug. + Internal(String), +} + +/// Helper functions for creating errors. +mod error { + use super::ExprError; + + pub(crate) fn expr(s: impl Into) -> Result { + Err(map::expr(s)) + } + + pub(crate) mod map { + use super::*; + + pub(crate) fn expr(s: impl Into) -> ExprError { + ExprError::Expression(s.into()) + } + } +} + +/// Context used when simplifying InfluxQL time range expressions. +#[derive(Default, Debug, Clone, Copy)] +pub struct ReduceContext { + /// The value for the `now()` function. + pub now: Option, + /// The timezone to evaluate literal timestamp strings. + pub tz: Option, +} + +#[allow(dead_code)] +fn reduce( + ctx: &ReduceContext, + cond: &ConditionalExpression, +) -> Result { + Ok(match cond { + ConditionalExpression::Expr(expr) => { + ConditionalExpression::Expr(Box::new(reduce_expr(ctx, expr)?)) + } + ConditionalExpression::Binary(_) => unimplemented!(), + ConditionalExpression::Grouped(_) => unimplemented!(), + }) +} + +/// Simplify the time range expression. +pub fn reduce_time_expr(ctx: &ReduceContext, expr: &Expr) -> ExprResult { + match reduce_expr(ctx, expr)? { + expr @ Expr::Literal(Literal::Timestamp(_)) => Ok(expr), + Expr::Literal(Literal::String(ref s)) => { + parse_timestamp_expr(s, ctx.tz).map_err(map_expr_err(expr)) + } + Expr::Literal(Literal::Duration(v)) => Ok(lit(nanos_to_timestamp(*v))), + Expr::Literal(Literal::Float(v)) => Ok(lit(nanos_to_timestamp(v as i64))), + Expr::Literal(Literal::Integer(v)) => Ok(lit(nanos_to_timestamp(v))), + _ => error::expr("invalid time range expression"), + } +} + +fn reduce_expr(ctx: &ReduceContext, expr: &Expr) -> ExprResult { + match expr { + Expr::Binary(ref v) => reduce_binary_expr(ctx, v).map_err(map_expr_err(expr)), + Expr::Call (call) if is_now_function(call.name.as_str()) => ctx.now.map(lit).ok_or_else(|| ExprError::Internal("unable to resolve now".into())), + Expr::Call (call) => { + error::expr( + format!("invalid function call '{}'", call.name), + ) + } + Expr::Nested(expr) => reduce_expr(ctx, expr), + Expr::Literal(val) => match val { + Literal::Integer(_) | + Literal::Float(_) | + Literal::String(_) | + Literal::Timestamp(_) | + Literal::Duration(_) => Ok(Expr::Literal(val.clone())), + _ => error::expr(format!( + "found literal '{val}', expected duration, float, integer, or timestamp string" + )), + }, + + Expr::VarRef { .. } | Expr::BindParameter(_) | Expr::Wildcard(_) | Expr::Distinct(_) => error::expr(format!( + "found symbol '{expr}', expected now() or a literal duration, float, integer and timestamp string" + )), + } +} + +fn reduce_binary_expr(ctx: &ReduceContext, expr: &Binary) -> ExprResult { + let lhs = reduce_expr(ctx, &expr.lhs)?; + let op = expr.op; + let rhs = reduce_expr(ctx, &expr.rhs)?; + + match lhs { + Expr::Literal(Literal::Duration(v)) => reduce_binary_lhs_duration(ctx, v, op, rhs), + Expr::Literal(Literal::Integer(v)) => reduce_binary_lhs_integer(ctx, v, op, rhs), + Expr::Literal(Literal::Float(v)) => reduce_binary_lhs_float(v, op, rhs), + Expr::Literal(Literal::Timestamp(v)) => reduce_binary_lhs_timestamp(ctx, v, op, rhs), + Expr::Literal(Literal::String(v)) => reduce_binary_lhs_string(ctx, v, op, rhs), + _ => Ok(Expr::Binary(Binary { + lhs: Box::new(lhs), + op, + rhs: Box::new(rhs), + })), + } +} + +/// Reduce `duration OP expr`. +/// +/// ```text +/// duration = duration ( ADD | SUB ) ( duration | NOW() ) +/// duration = duration ( MUL | DIV ) ( float | integer ) +/// timestamp = duration ADD string +/// timestamp = duration ADD timestamp +/// ``` +fn reduce_binary_lhs_duration( + ctx: &ReduceContext, + lhs: Duration, + op: BinaryOperator, + rhs: Expr, +) -> ExprResult { + match rhs { + Expr::Literal(ref val) => match val { + // durations may be added and subtracted from other durations + Literal::Duration(Duration(v)) => match op { + BinaryOperator::Add => Ok(lit(Duration( + lhs.checked_add(*v) + .ok_or_else(|| error::map::expr("overflow"))?, + ))), + BinaryOperator::Sub => Ok(lit(Duration( + lhs.checked_sub(*v) + .ok_or_else(|| error::map::expr("overflow"))?, + ))), + _ => error::expr(format!("found operator '{op}', expected +, -")), + }, + // durations may only be scaled by float literals + Literal::Float(v) => { + reduce_binary_lhs_duration(ctx, lhs, op, Expr::Literal(Literal::Integer(*v as i64))) + } + Literal::Integer(v) => match op { + BinaryOperator::Mul => Ok(lit(Duration(*lhs * *v))), + BinaryOperator::Div => Ok(lit(Duration(*lhs / *v))), + _ => error::expr(format!("found operator '{op}', expected *, /")), + }, + // A timestamp may be added to a duration + Literal::Timestamp(v) if matches!(op, BinaryOperator::Add) => { + Ok(lit(*v + chrono::Duration::nanoseconds(*lhs))) + } + Literal::String(v) => { + reduce_binary_lhs_duration(ctx, lhs, op, parse_timestamp_expr(v, ctx.tz)?) + } + // This should not occur, as acceptable literals are validated in `reduce_expr`. + _ => Err(ExprError::Internal(format!( + "unexpected literal '{rhs}' for duration expression" + ))), + }, + _ => error::expr("invalid duration expression"), + } +} + +/// Reduce `integer OP expr`. +/// +/// ```text +/// integer = integer ( ADD | SUB | MUL | DIV | MOD | BitwiseAND | BitwiseOR | BitwiseXOR ) integer +/// float = integer as float OP float +/// timestamp = integer as timestamp OP duration +/// ``` +fn reduce_binary_lhs_integer( + ctx: &ReduceContext, + lhs: i64, + op: BinaryOperator, + rhs: Expr, +) -> ExprResult { + match rhs { + Expr::Literal(Literal::Float(_)) => reduce_binary_lhs_float(lhs as f64, op, rhs), + Expr::Literal(Literal::Integer(v)) => Ok(lit(op.reduce(lhs, v))), + Expr::Literal(Literal::Duration(_)) => { + reduce_binary_lhs_timestamp(ctx, nanos_to_timestamp(lhs), op, rhs) + } + Expr::Literal(Literal::String(v)) => { + reduce_binary_lhs_duration(ctx, Duration(lhs), op, parse_timestamp_expr(&v, ctx.tz)?) + } + _ => error::expr("invalid integer expression"), + } +} + +/// Reduce `float OP expr`. +/// +/// ```text +/// float = float ( ADD | SUB | MUL | DIV | MOD ) ( float | integer) +/// ``` +fn reduce_binary_lhs_float(lhs: f64, op: BinaryOperator, rhs: Expr) -> ExprResult { + Ok(lit(match rhs { + Expr::Literal(Literal::Float(v)) => op + .try_reduce(lhs, v) + .ok_or_else(|| error::map::expr("invalid operator for float expression"))?, + Expr::Literal(Literal::Integer(v)) => op + .try_reduce(lhs, v) + .ok_or_else(|| error::map::expr("invalid operator for float expression"))?, + _ => return error::expr("invalid float expression"), + })) +} + +/// Reduce `timestamp OP expr`. +/// +/// The right-hand `expr` must be of a type that can be +/// coalesced to a duration, which includes a `duration`, `integer` or a +/// `string`. A `string` is parsed as a timestamp an interpreted as +/// the number of nanoseconds from the Unix epoch. +/// +/// ```text +/// timestamp = timestamp ( ADD | SUB ) ( duration | integer | string | timestamp ) +/// ``` +fn reduce_binary_lhs_timestamp( + ctx: &ReduceContext, + lhs: Timestamp, + op: BinaryOperator, + rhs: Expr, +) -> ExprResult { + match rhs { + Expr::Literal(Literal::Duration(d)) => match op { + BinaryOperator::Add => Ok(lit(lhs + chrono::Duration::nanoseconds(*d))), + BinaryOperator::Sub => Ok(lit(lhs - chrono::Duration::nanoseconds(*d))), + _ => error::expr(format!( + "invalid operator '{op}' for timestamp and duration: expected +, -" + )), + }, + Expr::Literal(Literal::Integer(_)) + // NOTE: This is a slight deviation from InfluxQL, for which the only valid binary + // operator for two timestamps is subtraction. By converting the timestamp to a + // duration and calling this function recursively, we permit the addition operator. + | Expr::Literal(Literal::Timestamp(_)) + | Expr::Literal(Literal::String(_)) => { + reduce_binary_lhs_timestamp(ctx, lhs, op, expr_to_duration(ctx, rhs)?) + } + _ => error::expr(format!( + "invalid expression '{rhs}': expected duration, integer or timestamp string" + )), + } +} + +fn expr_to_duration(ctx: &ReduceContext, expr: Expr) -> ExprResult { + Ok(lit(match expr { + Expr::Literal(Literal::Duration(v)) => v, + Expr::Literal(Literal::Integer(v)) => Duration(v), + Expr::Literal(Literal::Timestamp(v)) => Duration(v.timestamp_nanos()), + Expr::Literal(Literal::String(v)) => { + Duration(parse_timestamp_nanos(&v, ctx.tz)?.timestamp_nanos()) + } + _ => return error::expr(format!("unable to cast {expr} to duration")), + })) +} + +/// Reduce `string OP expr`. +/// +/// If `expr` is a string, concatenates the two values and returns a new string. +/// If `expr` is a duration, integer or timestamp, the left-hand +/// string is parsed as a timestamp and the expression evaluated as +/// `timestamp OP expr` +fn reduce_binary_lhs_string( + ctx: &ReduceContext, + lhs: String, + op: BinaryOperator, + rhs: Expr, +) -> ExprResult { + match rhs { + Expr::Literal(Literal::String(ref s)) => match op { + // concatenate the two strings + BinaryOperator::Add => Ok(lit(lhs + s)), + _ => reduce_binary_lhs_timestamp(ctx, parse_timestamp_nanos(&lhs, ctx.tz)?, op, rhs), + }, + Expr::Literal(Literal::Duration(_)) + | Expr::Literal(Literal::Timestamp(_)) + | Expr::Literal(Literal::Integer(_)) => { + reduce_binary_lhs_timestamp(ctx, parse_timestamp_nanos(&lhs, ctx.tz)?, op, rhs) + } + _ => error::expr(format!( + "found '{rhs}', expected duration, integer or timestamp string" + )), + } +} + +/// Returns true if the conditional expression is a single node that +/// refers to the `time` column. +/// +/// In a conditional expression, this comparison is case-insensitive per the [Go implementation][go] +/// +/// [go]: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L5751-L5753 +fn is_time_field(cond: &ConditionalExpression) -> bool { + if let ConditionalExpression::Expr(expr) = cond { + if let Expr::VarRef(VarRef { ref name, .. }) = **expr { + name.eq_ignore_ascii_case("time") + } else { + false + } + } else { + false + } +} + +fn parse_timestamp_nanos(s: &str, tz: Option) -> Result { + parse_timestamp(s, tz) + .ok_or_else(|| error::map::expr(format!("'{s}' is not a valid timestamp"))) +} + +/// Parse s as a timestamp in the specified timezone and return the timestamp +/// as a literal timestamp expression. +fn parse_timestamp_expr(s: &str, tz: Option) -> ExprResult { + Ok(Expr::Literal(Literal::Timestamp(parse_timestamp_nanos( + s, tz, + )?))) +} + +fn map_expr_err(expr: &Expr) -> impl Fn(ExprError) -> ExprError + '_ { + move |err| { + error::map::expr(format!( + "invalid expression \"{expr}\": {}", + match err { + ExprError::Expression(str) | ExprError::Internal(str) => str, + } + )) + } +} + +#[cfg(test)] +mod test { + use crate::expression::ConditionalExpression; + use crate::time_range::{ + duration_expr_to_nanoseconds, reduce_time_expr, split_cond, ExprError, ExprResult, + ReduceContext, + }; + use crate::timestamp::Timestamp; + use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Offset, Utc}; + use test_helpers::assert_error; + + #[ignore] + #[test] + fn test_split_cond() { + let cond: ConditionalExpression = "time > now() - 1h".parse().unwrap(); + let (cond, time) = split_cond(&cond); + println!("{cond:?}, {time:?}"); + } + + #[test] + fn test_rewrite_time_expression_no_timezone() { + fn process_expr(s: &str) -> ExprResult { + let cond: ConditionalExpression = + s.parse().expect("unexpected error parsing expression"); + let ctx = ReduceContext { + now: Some(Timestamp::from_utc( + NaiveDateTime::new( + NaiveDate::from_ymd_opt(2004, 4, 9).unwrap(), + NaiveTime::from_hms_opt(12, 13, 14).unwrap(), + ), + Utc.fix(), + )), + tz: None, + }; + reduce_time_expr(&ctx, cond.expr().unwrap()) + } + + macro_rules! assert_expr { + ($S: expr, $EXPECTED: expr) => { + let expr = process_expr($S).unwrap(); + assert_eq!(expr.to_string(), $EXPECTED); + }; + } + + // + // Valid literals + // + + // Duration + assert_expr!("1d", "1970-01-02T00:00:00+00:00"); + + // Single integer interpreted as a Unix nanosecond epoch + assert_expr!("1157082310000000000", "2006-09-01T03:45:10+00:00"); + + // Single float interpreted as a Unix nanosecond epoch + assert_expr!("1157082310000000000.0", "2006-09-01T03:45:10+00:00"); + + // Single string interpreted as a timestamp + assert_expr!( + "'2004-04-09 02:33:45.123456789'", + "2004-04-09T02:33:45.123456789+00:00" + ); + + // now + assert_expr!("now()", "2004-04-09T12:13:14+00:00"); + + // + // Expressions + // + + // now() OP expr + assert_expr!("now() - 5m", "2004-04-09T12:08:14+00:00"); + assert_expr!("(now() - 5m)", "2004-04-09T12:08:14+00:00"); + assert_expr!("now() - 5m - 60m", "2004-04-09T11:08:14+00:00"); + assert_expr!("now() - 500", "2004-04-09T12:13:13.999999500+00:00"); + assert_expr!("now() - (5m + 60m)", "2004-04-09T11:08:14+00:00"); + + // expr OP now() + assert_expr!("5m + now()", "2004-04-09T12:18:14+00:00"); + + // duration OP expr + assert_expr!("1w3d + 1d", "1970-01-12T00:00:00+00:00"); + assert_expr!("1w3d - 1d", "1970-01-10T00:00:00+00:00"); + + // string OP expr + assert_expr!("'2004-04-09' - '2004-04-08'", "1970-01-02T00:00:00+00:00"); + + assert_expr!("'2004-04-09' + '02:33:45'", "2004-04-09T02:33:45+00:00"); + + // integer OP expr + assert_expr!("1157082310000000000 - 1s", "2006-09-01T03:45:09+00:00"); + + // nested evaluation order + assert_expr!("now() - (6m - (1m * 5))", r#"2004-04-09T12:12:14+00:00"#); + + // Fallible + + use super::ExprError::Expression; + assert_error!(process_expr("foo + 1"), Expression(ref s) if s == "invalid expression \"foo + 1\": found symbol 'foo', expected now() or a literal duration, float, integer and timestamp string"); + + assert_error!(process_expr("5m - now()"), Expression(ref s) if s == "invalid expression \"5m - now()\": unexpected literal '2004-04-09T12:13:14+00:00' for duration expression"); + + assert_error!(process_expr("'2004-04-09' + false"), Expression(ref s) if s == "invalid expression \"'2004-04-09' + false\": found literal 'false', expected duration, float, integer, or timestamp string"); + + assert_error!(process_expr("1s * 1s"), Expression(ref s) if s == "invalid expression \"1000ms * 1000ms\": found operator '*', expected +, -"); + assert_error!(process_expr("1s + 0.5"), Expression(ref s) if s == "invalid expression \"1000ms + 0.5\": found operator '+', expected *, /"); + + assert_error!(process_expr("'2004-04-09T'"), Expression(ref s) if s == "invalid expression \"'2004-04-09T'\": '2004-04-09T' is not a valid timestamp"); + assert_error!(process_expr("now() * 1"), Expression(ref s) if s == "invalid expression \"now() * 1\": invalid operator '*' for timestamp and duration: expected +, -"); + assert_error!(process_expr("'2' + now()"), Expression(ref s) if s == "invalid expression \"'2' + now()\": '2' is not a valid timestamp"); + assert_error!(process_expr("'2' + '3'"), Expression(ref s) if s == "invalid expression \"'2' + '3'\": '23' is not a valid timestamp"); + assert_error!(process_expr("'2' + '3' + 10s"), Expression(ref s) if s == "invalid expression \"'2' + '3' + 10s\": '23' is not a valid timestamp"); + } + + #[test] + fn test_rewrite_time_expression_with_timezone() { + fn process_expr(s: &str) -> ExprResult { + let cond: ConditionalExpression = + s.parse().expect("unexpected error parsing expression"); + let ctx = ReduceContext { + now: None, + tz: Some(chrono_tz::Australia::Hobart), + }; + reduce_time_expr(&ctx, cond.expr().unwrap()) + } + + macro_rules! assert_expr { + ($S: expr, $EXPECTED: expr) => { + let expr = process_expr($S).unwrap(); + assert_eq!(expr.to_string(), $EXPECTED); + }; + } + + assert_expr!( + "'2004-04-09 10:05:00.123456789'", + "2004-04-09T10:05:00.123456789+10:00" + ); + assert_expr!("'2004-04-09'", "2004-04-09T00:00:00+10:00"); + assert_expr!( + "'2004-04-09T10:05:00.123456789Z'", + "2004-04-09T20:05:00.123456789+10:00" + ); + } + + #[test] + fn test_expr_to_duration() { + fn parse(s: &str) -> Result { + let expr = s + .parse::() + .unwrap() + .expr() + .unwrap() + .clone(); + duration_expr_to_nanoseconds(&expr) + } + + let cases = vec![ + ("10s", 10_000_000_000_i64), + ("10s + 1d", 86_410_000_000_000), + ("5d10ms", 432_000_010_000_000), + ("-2d10ms", -172800010000000), + ("-2d10ns", -172800000000010), + ]; + + for (interval_str, exp) in cases { + let got = parse(interval_str).unwrap(); + assert_eq!(got, exp, "Actual: {got:?}"); + } + } +} diff --git a/iox_query_influxql/src/plan/timestamp.rs b/influxdb_influxql_parser/src/timestamp.rs similarity index 95% rename from iox_query_influxql/src/plan/timestamp.rs rename to influxdb_influxql_parser/src/timestamp.rs index 35b5e65f9b..69429d072e 100644 --- a/iox_query_influxql/src/plan/timestamp.rs +++ b/influxdb_influxql_parser/src/timestamp.rs @@ -1,9 +1,12 @@ -use crate::plan::error; +//! Parse InfluxQL timestamp strings. +//! use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Offset, TimeZone}; -use datafusion::common::Result; + +/// Represents an InfluxQL timestamp. +pub type Timestamp = DateTime; /// Parse the timestamp string and return a DateTime in UTC. -fn parse_timestamp_utc(s: &str) -> Result> { +fn parse_timestamp_utc(s: &str) -> Option { // 1a. Try a date time format string with nanosecond precision and then without // https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L3661 NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") @@ -20,11 +23,11 @@ fn parse_timestamp_utc(s: &str) -> Result> { .map(|nd| nd.and_time(NaiveTime::default())), ) .map(|ts| DateTime::from_utc(ts, chrono::Utc.fix())) - .map_err(|_| error::map::query("invalid timestamp string")) + .ok() } /// Parse the timestamp string and return a DateTime in the specified timezone. -fn parse_timestamp_tz(s: &str, tz: chrono_tz::Tz) -> Result> { +fn parse_timestamp_tz(s: &str, tz: chrono_tz::Tz) -> Option { // 1a. Try a date time format string with nanosecond precision // https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L3661 tz.datetime_from_str(s, "%Y-%m-%d %H:%M:%S%.f") @@ -51,7 +54,7 @@ fn parse_timestamp_tz(s: &str, tz: chrono_tz::Tz) -> Result Result) -> Result> { +pub fn parse_timestamp(s: &str, tz: Option) -> Option { match tz { Some(tz) => parse_timestamp_tz(s, tz), // We could have mapped None => Utc and called parse_timestamp_tz, however, diff --git a/iox_query_influxql/Cargo.toml b/iox_query_influxql/Cargo.toml index 491cd32634..88dc9a9c68 100644 --- a/iox_query_influxql/Cargo.toml +++ b/iox_query_influxql/Cargo.toml @@ -7,7 +7,6 @@ license.workspace = true [dependencies] arrow = { workspace = true, features = ["prettyprint"] } -chrono = { version = "0.4", default-features = false } chrono-tz = { version = "0.8" } datafusion = { workspace = true } datafusion_util = { path = "../datafusion_util" } @@ -25,6 +24,7 @@ thiserror = "1.0" workspace-hack = { version = "0.1", path = "../workspace-hack" } [dev-dependencies] # In alphabetical order +chrono = { version = "0.4", default-features = false } test_helpers = { path = "../test_helpers" } assert_matches = "1" insta = { version = "1", features = ["yaml"] } diff --git a/iox_query_influxql/src/plan/error.rs b/iox_query_influxql/src/plan/error.rs index 6b12535ec9..c952521f9c 100644 --- a/iox_query_influxql/src/plan/error.rs +++ b/iox_query_influxql/src/plan/error.rs @@ -19,6 +19,7 @@ pub(crate) fn not_implemented(feature: impl Into) -> Result { /// making them convenient to use with functions like `map_err`. pub(crate) mod map { use datafusion::common::DataFusionError; + use influxdb_influxql_parser::time_range::ExprError; use thiserror::Error; #[derive(Debug, Error)] @@ -47,6 +48,14 @@ pub(crate) mod map { DataFusionError::NotImplemented(feature.into()) } + /// Map an [`ExprError`] to a DataFusion error. + pub(crate) fn expr_error(err: ExprError) -> DataFusionError { + match err { + ExprError::Expression(s) => query(s), + ExprError::Internal(s) => internal(s), + } + } + #[cfg(test)] mod test { use crate::plan::error::map::PlannerError; diff --git a/iox_query_influxql/src/plan/influxql_time_range_expression.rs b/iox_query_influxql/src/plan/influxql_time_range_expression.rs index 31bf1b1d5a..8090641e73 100644 --- a/iox_query_influxql/src/plan/influxql_time_range_expression.rs +++ b/iox_query_influxql/src/plan/influxql_time_range_expression.rs @@ -1,588 +1,16 @@ //! APIs for transforming InfluxQL [expressions][influxdb_influxql_parser::expression::Expr]. use crate::plan::error; -use crate::plan::timestamp::parse_timestamp; -use crate::plan::util::binary_operator_to_df_operator; -use datafusion::common::{DataFusionError, Result, ScalarValue}; -use datafusion::logical_expr::{binary_expr, lit, now, BinaryExpr, Expr as DFExpr, Operator}; -use influxdb_influxql_parser::expression::{Binary, BinaryOperator, Call}; -use influxdb_influxql_parser::functions::is_now_function; -use influxdb_influxql_parser::{expression::Expr, literal::Literal}; +use datafusion::common::{Result, ScalarValue}; +use datafusion::logical_expr::{lit, Expr as DFExpr}; +use influxdb_influxql_parser::expression::Expr; +use influxdb_influxql_parser::time_range::duration_expr_to_nanoseconds; type ExprResult = Result; -/// Transform an InfluxQL expression, to a DataFusion logical [`Expr`][DFExpr], -/// applying rules specific to time-range expressions. When possible, literal values are folded. -/// -/// ## NOTEs -/// -/// The rules applied to this transformation are determined from -/// the Go InfluxQL parser and treated as the source of truth in the -/// absence of an official specification. Most of the implementation -/// is sourced from the [`getTimeRange`][] and [`Reduce`][] functions. -/// -/// A [time-range][] expression is determined when either the left or right -/// hand side of a [`ConditionalExpression`][influxdb_influxql_parser::expression::ConditionalExpression] -/// has a single node that refers to a `time` field. Whilst most of InfluxQL -/// performs comparisons of fields using case-sensitive matches, this is a -/// case-insensitive match, per the [`conditionExpr`][conditionExpr] function. -/// -/// Binary expressions, where the left and right hand sides are strings, are -/// treated as a string concatenation operation. All other expressions are -/// treated as arithmetic expressions. -/// -/// Literal values interpreted as follows: -/// -/// * single-quoted strings are interpreted as timestamps when either the left or right -/// hand side of the binary expression is numeric. -/// * integer and float values as nanosecond offsets from the Unix epoch. -/// * The Go implementation may interpret a number as a timestamp or duration, -/// depending on context, however, in reality both are just offsets from the Unix epoch. -/// -/// [time range]: https://docs.influxdata.com/influxdb/v1.8/query_language/explore-data/#absolute-time -/// [`getTimeRange`]: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L5788-L5791 -/// [`Reduce`]: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4850-L4852 -/// [conditionExpr]: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L5751-L5756 -/// [`TZ`]: https://docs.influxdata.com/influxdb/v1.8/query_language/explore-data/#the-time-zone-clause -pub(in crate::plan) fn time_range_to_df_expr(expr: &Expr, tz: Option) -> ExprResult { - let df_expr = reduce_expr(expr, tz)?; - - // Attempt to coerce the final expression into a timestamp - Ok(match df_expr { - // timestamp literals require no transformation and Call has - // already been validated as a now() function call. - DFExpr::Literal(ScalarValue::TimestampNanosecond(..)) - | DFExpr::ScalarFunction { .. } - | DFExpr::BinaryExpr { .. } => df_expr, - DFExpr::Literal(ScalarValue::Utf8(Some(s))) => { - parse_timestamp_df_expr(&s, tz).map_err(map_expr_err(expr))? - } - DFExpr::Literal(ScalarValue::IntervalMonthDayNano(Some(d))) => { - DFExpr::Literal(ScalarValue::TimestampNanosecond(Some(d as i64), None)) - } - DFExpr::Literal(ScalarValue::Float64(Some(v))) => { - DFExpr::Literal(ScalarValue::TimestampNanosecond(Some(v as i64), None)) - } - DFExpr::Literal(ScalarValue::Int64(Some(v))) => { - DFExpr::Literal(ScalarValue::TimestampNanosecond(Some(v), None)) - } - _ => return error::query("invalid time range expression"), - }) -} - /// Simplifies `expr` to an InfluxQL duration and returns a DataFusion interval. /// /// Returns an error if `expr` is not a duration expression. pub(super) fn expr_to_df_interval_dt(expr: &Expr) -> ExprResult { - let ns = duration_expr_to_nanoseconds(expr)?; + let ns = duration_expr_to_nanoseconds(expr).map_err(error::map::expr_error)?; Ok(lit(ScalarValue::new_interval_mdn(0, 0, ns))) } - -/// Reduces an InfluxQL duration `expr` to a nanosecond interval. -pub(super) fn duration_expr_to_nanoseconds(expr: &Expr) -> Result { - let df_expr = reduce_expr(expr, None)?; - match df_expr { - DFExpr::Literal(ScalarValue::IntervalMonthDayNano(Some(v))) => Ok(v as i64), - DFExpr::Literal(ScalarValue::Float64(Some(v))) => Ok(v as i64), - DFExpr::Literal(ScalarValue::Int64(Some(v))) => Ok(v), - _ => error::query("invalid duration expression"), - } -} - -fn map_expr_err(expr: &Expr) -> impl Fn(DataFusionError) -> DataFusionError + '_ { - move |err| { - error::map::query(format!( - "invalid expression \"{}\": {}", - expr, - match err { - DataFusionError::Plan(str) => str, - _ => err.to_string(), - } - )) - } -} - -fn reduce_expr(expr: &Expr, tz: Option) -> ExprResult { - match expr { - Expr::Binary(v) => reduce_binary_expr(v, tz).map_err(map_expr_err(expr)), - Expr::Call (Call { name, .. }) => { - if !is_now_function(name) { - return error::query( - format!("invalid function call '{name}'"), - ); - } - Ok(now()) - } - Expr::Nested(expr) => reduce_expr(expr, tz), - Expr::Literal(val) => match val { - Literal::Integer(v) => Ok(lit(*v)), - Literal::Float(v) => Ok(lit(*v)), - Literal::String(v) => Ok(lit(v.clone())), - Literal::Timestamp(v) => Ok(lit(ScalarValue::TimestampNanosecond( - Some(v.timestamp_nanos()), - None, - ))), - Literal::Duration(v) => Ok(lit(ScalarValue::new_interval_mdn(0, 0, **v))), - _ => error::query(format!( - "found literal '{val}', expected duration, float, integer, or timestamp string" - )), - }, - - Expr::VarRef { .. } | Expr::BindParameter(_) | Expr::Wildcard(_) | Expr::Distinct(_) => error::query(format!( - "found symbol '{expr}', expected now() or a literal duration, float, integer and timestamp string" - )), - } -} - -fn reduce_binary_expr(expr: &Binary, tz: Option) -> ExprResult { - let lhs = reduce_expr(&expr.lhs, tz)?; - let op = expr.op; - let rhs = reduce_expr(&expr.rhs, tz)?; - - match lhs { - DFExpr::Literal(ScalarValue::IntervalMonthDayNano(Some(v))) => { - reduce_binary_lhs_duration_df_expr(v, op, &rhs, tz) - } - DFExpr::Literal(ScalarValue::Int64(Some(v))) => { - reduce_binary_lhs_integer_df_expr(v, op, &rhs, tz) - } - DFExpr::Literal(ScalarValue::Float64(Some(v))) => { - reduce_binary_lhs_float_df_expr(v, op, &rhs) - } - DFExpr::Literal(ScalarValue::TimestampNanosecond(Some(v), _)) => { - reduce_binary_lhs_timestamp_df_expr(v, op, &rhs, tz) - } - DFExpr::ScalarFunction { .. } => { - reduce_binary_scalar_df_expr(&lhs, op, &expr_to_interval_df_expr(&rhs, tz)?) - } - DFExpr::Literal(ScalarValue::Utf8(Some(v))) => { - reduce_binary_lhs_string_df_expr(&v, op, &rhs, tz) - } - _ => Ok(DFExpr::BinaryExpr(BinaryExpr { - left: Box::new(lhs), - op: binary_operator_to_df_operator(op), - right: Box::new(rhs), - })), - } -} - -/// Reduce `duration OP expr`. -/// -/// ```text -/// duration = duration ( ADD | SUB ) ( duration | NOW() ) -/// duration = duration ( MUL | DIV ) ( float | integer ) -/// timestamp = duration ADD string -/// timestamp = duration ADD timestamp -/// ``` -fn reduce_binary_lhs_duration_df_expr( - lhs: i128, - op: BinaryOperator, - rhs: &DFExpr, - tz: Option, -) -> Result { - match rhs { - DFExpr::Literal(val) => match val { - // durations may be added and subtracted from other durations - ScalarValue::IntervalMonthDayNano(Some(d)) => match op { - BinaryOperator::Add => { - Ok(lit(ScalarValue::new_interval_mdn(0, 0, (lhs + *d) as i64))) - } - BinaryOperator::Sub => { - Ok(lit(ScalarValue::new_interval_mdn(0, 0, (lhs - *d) as i64))) - } - _ => error::query(format!("found operator '{op}', expected +, -")), - }, - // durations may only be scaled by float literals - ScalarValue::Float64(Some(v)) => { - reduce_binary_lhs_duration_df_expr(lhs, op, &lit(*v as i64), tz) - } - ScalarValue::Int64(Some(v)) => match op { - BinaryOperator::Mul => { - Ok(lit(ScalarValue::new_interval_mdn(0, 0, lhs as i64 * *v))) - } - BinaryOperator::Div => { - Ok(lit(ScalarValue::new_interval_mdn(0, 0, lhs as i64 / *v))) - } - _ => error::query(format!("found operator '{op}', expected *, /")), - }, - // A timestamp may be added to a duration - ScalarValue::TimestampNanosecond(Some(v), _) if matches!(op, BinaryOperator::Add) => { - Ok(lit(ScalarValue::TimestampNanosecond( - Some(*v + lhs as i64), - None, - ))) - } - ScalarValue::Utf8(Some(s)) => { - reduce_binary_lhs_duration_df_expr(lhs, op, &parse_timestamp_df_expr(s, tz)?, tz) - } - // This should not occur, as all the DataFusion literal values created by this process - // are handled above. - _ => error::internal(format!( - "unexpected DataFusion literal '{rhs}' for duration expression" - )), - }, - DFExpr::ScalarFunction { .. } => reduce_binary_scalar_df_expr( - &expr_to_interval_df_expr(&lit(ScalarValue::new_interval_mdn(0, 0, lhs as i64)), tz)?, - op, - rhs, - ), - _ => error::query("invalid duration expression"), - } -} - -/// Reduce `integer OP expr`. -/// -/// ```text -/// integer = integer ( ADD | SUB | MUL | DIV | MOD | BitwiseAND | BitwiseOR | BitwiseXOR ) integer -/// float = integer as float OP float -/// timestamp = integer as timestamp OP duration -/// ``` -fn reduce_binary_lhs_integer_df_expr( - lhs: i64, - op: BinaryOperator, - rhs: &DFExpr, - tz: Option, -) -> ExprResult { - match rhs { - DFExpr::Literal(ScalarValue::Float64(Some(_))) => { - reduce_binary_lhs_float_df_expr(lhs as f64, op, rhs) - } - DFExpr::Literal(ScalarValue::Int64(Some(v))) => Ok(lit(op.reduce(lhs, *v))), - DFExpr::Literal(ScalarValue::IntervalMonthDayNano(Some(_))) => { - reduce_binary_lhs_timestamp_df_expr(lhs, op, rhs, tz) - } - DFExpr::ScalarFunction { .. } | DFExpr::Literal(ScalarValue::TimestampNanosecond(..)) => { - reduce_binary_lhs_duration_df_expr(lhs.into(), op, rhs, tz) - } - DFExpr::Literal(ScalarValue::Utf8(Some(s))) => { - reduce_binary_lhs_duration_df_expr(lhs.into(), op, &parse_timestamp_df_expr(s, tz)?, tz) - } - _ => error::query("invalid integer expression"), - } -} - -/// Reduce `float OP expr`. -/// -/// ```text -/// float = float ( ADD | SUB | MUL | DIV | MOD ) ( float | integer) -/// ``` -fn reduce_binary_lhs_float_df_expr(lhs: f64, op: BinaryOperator, rhs: &DFExpr) -> ExprResult { - Ok(lit(match rhs { - DFExpr::Literal(ScalarValue::Float64(Some(rhs))) => op - .try_reduce(lhs, *rhs) - .ok_or_else(|| error::map::query("invalid operator for float expression"))?, - DFExpr::Literal(ScalarValue::Int64(Some(rhs))) => op - .try_reduce(lhs, *rhs) - .ok_or_else(|| error::map::query("invalid operator for float expression"))?, - _ => return error::query("invalid float expression"), - })) -} - -/// Reduce `timestamp OP expr`. -/// -/// The right-hand `expr` must be of a type that can be -/// coalesced to a duration, which includes a `duration`, `integer` or a -/// `string`. A `string` is parsed as a timestamp an interpreted as -/// the number of nanoseconds from the Unix epoch. -/// -/// ```text -/// timestamp = timestamp ( ADD | SUB ) ( duration | integer | string | timestamp ) -/// ``` -fn reduce_binary_lhs_timestamp_df_expr( - lhs: i64, - op: BinaryOperator, - rhs: &DFExpr, - tz: Option, -) -> ExprResult { - match rhs { - DFExpr::Literal(ScalarValue::IntervalMonthDayNano(Some(d))) => match op { - BinaryOperator::Add => Ok(lit(ScalarValue::TimestampNanosecond(Some(lhs + *d as i64), None))), - BinaryOperator::Sub => Ok(lit(ScalarValue::TimestampNanosecond(Some(lhs - *d as i64), None))), - _ => error::query( - format!("invalid operator '{op}' for timestamp and duration: expected +, -"), - ), - } - DFExpr::Literal(ScalarValue::Int64(_)) - // NOTE: This is a slight deviation from InfluxQL, for which the only valid binary - // operator for two timestamps is subtraction. By converting the timestamp to a - // duration and calling this function recursively, we permit the addition operator. - | DFExpr::Literal(ScalarValue::TimestampNanosecond(..)) - | DFExpr::Literal(ScalarValue::Utf8(_)) => reduce_binary_lhs_timestamp_df_expr( - lhs, - op, - &expr_to_interval_df_expr(rhs, tz)?, - tz, - ), - _ => error::query( - format!("invalid expression '{rhs}': expected duration, integer or timestamp string"), - ), - } -} - -/// Reduce `expr ( + | - ) expr`. -/// -/// This API is called when either the left or right hand expression is -/// a scalar function and ensures the operator is either addition or subtraction. -fn reduce_binary_scalar_df_expr(lhs: &DFExpr, op: BinaryOperator, rhs: &DFExpr) -> ExprResult { - match op { - BinaryOperator::Add => Ok(binary_expr(lhs.clone(), Operator::Plus, rhs.clone())), - BinaryOperator::Sub => Ok(binary_expr(lhs.clone(), Operator::Minus, rhs.clone())), - _ => error::query(format!("found operator '{op}', expected +, -")), - } -} - -/// Converts `rhs` to a DataFusion interval literal. -fn expr_to_interval_df_expr(expr: &DFExpr, tz: Option) -> ExprResult { - Ok(lit(ScalarValue::new_interval_mdn( - 0, - 0, - match expr { - DFExpr::Literal(ScalarValue::IntervalMonthDayNano(Some(d))) => *d as i64, - DFExpr::Literal(ScalarValue::Int64(Some(v))) => *v, - DFExpr::Literal(ScalarValue::TimestampNanosecond(Some(v), _)) => *v, - DFExpr::Literal(ScalarValue::Utf8(Some(s))) => parse_timestamp_nanos(s, tz)?, - _ => return error::query(format!("unable to cast '{expr}' to duration")), - }, - ))) -} - -/// Reduce `string OP expr`. -/// -/// If `expr` is a string, concatenates the two values and returns a new string. -/// If `expr` is a duration, integer or timestamp, the left-hand -/// string is parsed as a timestamp and the expression evaluated as -/// `timestamp OP expr` -fn reduce_binary_lhs_string_df_expr( - lhs: &str, - op: BinaryOperator, - rhs: &DFExpr, - tz: Option, -) -> ExprResult { - match rhs { - DFExpr::Literal(ScalarValue::Utf8(Some(s))) => match op { - // concatenate the two strings - BinaryOperator::Add => Ok(lit(lhs.to_string() + s)), - _ => reduce_binary_lhs_timestamp_df_expr(parse_timestamp_nanos(lhs, tz)?, op, rhs, tz), - }, - DFExpr::Literal(ScalarValue::IntervalMonthDayNano(_)) - | DFExpr::Literal(ScalarValue::TimestampNanosecond(..)) - | DFExpr::Literal(ScalarValue::Int64(_)) => { - reduce_binary_lhs_timestamp_df_expr(parse_timestamp_nanos(lhs, tz)?, op, rhs, tz) - } - _ => error::query(format!( - "found '{rhs}', expected duration, integer or timestamp string" - )), - } -} - -fn parse_timestamp_nanos(s: &str, tz: Option) -> Result { - parse_timestamp(s, tz) - .map(|ts| ts.timestamp_nanos()) - .map_err(|_| error::map::query(format!("'{s}' is not a valid timestamp"))) -} - -/// Parse s as a timestamp in the specified timezone and return the timestamp -/// as a literal timestamp expression. -fn parse_timestamp_df_expr(s: &str, tz: Option) -> ExprResult { - Ok(lit(ScalarValue::TimestampNanosecond( - Some(parse_timestamp_nanos(s, tz)?), - None, - ))) -} - -#[cfg(test)] -mod test { - use super::*; - use influxdb_influxql_parser::expression::ConditionalExpression; - use test_helpers::assert_error; - - #[test] - fn test_rewrite_time_expression_no_timezone() { - fn process_expr(s: &str) -> ExprResult { - let cond: ConditionalExpression = - s.parse().expect("unexpected error parsing expression"); - time_range_to_df_expr(cond.expr().unwrap(), None) - } - - macro_rules! assert_expr { - ($S: expr, $EXPECTED: expr) => { - let expr = process_expr($S).unwrap(); - assert_eq!(expr.to_string(), $EXPECTED); - }; - } - - // - // Valid literals - // - - // Duration - assert_expr!("1d", "TimestampNanosecond(86400000000000, None)"); - - // Single integer interpreted as a Unix nanosecond epoch - assert_expr!( - "1157082310000000000", - "TimestampNanosecond(1157082310000000000, None)" - ); - - // Single float interpreted as a Unix nanosecond epoch - assert_expr!( - "1157082310000000000.0", - "TimestampNanosecond(1157082310000000000, None)" - ); - - // Single string interpreted as a timestamp - assert_expr!( - "'2004-04-09 02:33:45.123456789'", - "TimestampNanosecond(1081478025123456789, None)" - ); - - // now - assert_expr!("now()", "now()"); - - // - // Expressions - // - - // now() OP expr - assert_expr!( - "now() - 5m", - r#"now() - IntervalMonthDayNano("300000000000")"# - ); - assert_expr!( - "(now() - 5m)", - r#"now() - IntervalMonthDayNano("300000000000")"# - ); - assert_expr!( - "now() - 5m - 60m", - r#"now() - IntervalMonthDayNano("300000000000") - IntervalMonthDayNano("3600000000000")"# - ); - assert_expr!("now() - 500", r#"now() - IntervalMonthDayNano("500")"#); - assert_expr!( - "now() - (5m + 60m)", - r#"now() - IntervalMonthDayNano("3900000000000")"# - ); - - // expr OP now() - assert_expr!( - "5m - now()", - r#"IntervalMonthDayNano("300000000000") - now()"# - ); - assert_expr!( - "5m + now()", - r#"IntervalMonthDayNano("300000000000") + now()"# - ); - - // duration OP expr - assert_expr!("1w3d + 1d", "TimestampNanosecond(950400000000000, None)"); - assert_expr!("1w3d - 1d", "TimestampNanosecond(777600000000000, None)"); - - // string OP expr - assert_expr!( - "'2004-04-09' - '2004-04-08'", - "TimestampNanosecond(86400000000000, None)" - ); - - assert_expr!( - "'2004-04-09' + '02:33:45'", - "TimestampNanosecond(1081478025000000000, None)" - ); - - // integer OP expr - assert_expr!( - "1157082310000000000 - 1s", - "TimestampNanosecond(1157082309000000000, None)" - ); - - // nested evaluation order - assert_expr!( - "now() - (6m - (1m * 5))", - r#"now() - IntervalMonthDayNano("60000000000")"# - ); - - // Fallible - - use DataFusionError::Plan; - assert_error!(process_expr("foo + 1"), Plan(ref s) if s == "invalid expression \"foo + 1\": found symbol 'foo', expected now() or a literal duration, float, integer and timestamp string"); - - assert_error!(process_expr("'2004-04-09' + false"), Plan(ref s) if s == "invalid expression \"'2004-04-09' + false\": found literal 'false', expected duration, float, integer, or timestamp string"); - - assert_error!(process_expr("1s * 1s"), Plan(ref s) if s == "invalid expression \"1000ms * 1000ms\": found operator '*', expected +, -"); - assert_error!(process_expr("1s + 0.5"), Plan(ref s) if s == "invalid expression \"1000ms + 0.5\": found operator '+', expected *, /"); - - assert_error!(process_expr("'2004-04-09T'"), Plan(ref s) if s == "invalid expression \"'2004-04-09T'\": '2004-04-09T' is not a valid timestamp"); - assert_error!(process_expr("now() + now()"), Plan(ref s) if s == "invalid expression \"now() + now()\": unable to cast 'now()' to duration"); - assert_error!(process_expr("now() * 1"), Plan(ref s) if s == "invalid expression \"now() * 1\": found operator '*', expected +, -"); - assert_error!(process_expr("'2' + now()"), Plan(ref s) if s == "invalid expression \"'2' + now()\": found 'now()', expected duration, integer or timestamp string"); - assert_error!(process_expr("'2' + '3'"), Plan(ref s) if s == "invalid expression \"'2' + '3'\": '23' is not a valid timestamp"); - assert_error!(process_expr("'2' + '3' + 10s"), Plan(ref s) if s == "invalid expression \"'2' + '3' + 10s\": '23' is not a valid timestamp"); - } - - #[test] - fn test_rewrite_time_expression_with_timezone() { - fn process_expr(s: &str) -> ExprResult { - let cond: ConditionalExpression = - s.parse().expect("unexpected error parsing expression"); - time_range_to_df_expr(cond.expr().unwrap(), Some(chrono_tz::Australia::Hobart)) - } - - macro_rules! assert_expr { - ($S: expr, $EXPECTED: expr) => { - let expr = process_expr($S).unwrap(); - assert_eq!(expr.to_string(), $EXPECTED); - }; - } - - assert_expr!( - "'2004-04-09 10:05:00.123456789'", - "TimestampNanosecond(1081469100123456789, None)" // 2004-04-09T00:05:00.123456789Z - ); - assert_expr!( - "'2004-04-09'", - "TimestampNanosecond(1081432800000000000, None)" // 2004-04-08T14:00:00Z - ); - assert_expr!( - "'2004-04-09T10:05:00.123456789Z'", - "TimestampNanosecond(1081505100123456789, None)" // 2004-04-09T10:05:00.123456789Z - ); - } - - #[test] - fn test_expr_to_df_interval_dt() { - fn parse(s: &str) -> ExprResult { - let expr = s - .parse::() - .unwrap() - .expr() - .unwrap() - .clone(); - expr_to_df_interval_dt(&expr) - } - - let cases = vec![ - ("10s", ScalarValue::new_interval_mdn(0, 0, 10_000_000_000)), - ( - "10s + 1d", - ScalarValue::new_interval_mdn(0, 0, 86_410_000_000_000), - ), - ( - "5d10ms", - ScalarValue::new_interval_mdn(0, 0, 432_000_010_000_000), - ), - ( - "-2d10ms", - ScalarValue::new_interval_mdn(0, 0, -172800010000000), - ), - ( - "-2d10ns", - ScalarValue::new_interval_mdn(0, 0, -172800000000010), - ), - ]; - - for (interval_str, expected_scalar) in cases { - let parsed_interval = parse(interval_str).unwrap(); - let DFExpr::Literal(actual_scalar) = parsed_interval else { - panic!("Expected literal Expr, got {parsed_interval:?}"); - }; - assert_eq!(actual_scalar, expected_scalar, "Actual: {actual_scalar:?}"); - } - } -} diff --git a/iox_query_influxql/src/plan/mod.rs b/iox_query_influxql/src/plan/mod.rs index b9831f27f2..895e69822b 100644 --- a/iox_query_influxql/src/plan/mod.rs +++ b/iox_query_influxql/src/plan/mod.rs @@ -7,7 +7,6 @@ mod ir; mod planner; mod rewriter; mod test_utils; -mod timestamp; mod util; mod util_copy; mod var_ref; diff --git a/iox_query_influxql/src/plan/planner.rs b/iox_query_influxql/src/plan/planner.rs index 0af1ffe9a7..7a23b2aa9e 100644 --- a/iox_query_influxql/src/plan/planner.rs +++ b/iox_query_influxql/src/plan/planner.rs @@ -4,9 +4,7 @@ mod test_utils; mod time_range; use crate::plan::error; -use crate::plan::influxql_time_range_expression::{ - duration_expr_to_nanoseconds, expr_to_df_interval_dt, time_range_to_df_expr, -}; +use crate::plan::influxql_time_range_expression::expr_to_df_interval_dt; use crate::plan::ir::{DataSource, Field, Select, SelectQuery}; use crate::plan::planner::select::{ fields_to_exprs_no_nulls, make_tag_key_column_meta, plan_with_sort, ProjectionInfo, @@ -55,6 +53,10 @@ use influxdb_influxql_parser::show_measurements::{ use influxdb_influxql_parser::show_tag_keys::ShowTagKeysStatement; use influxdb_influxql_parser::show_tag_values::{ShowTagValuesStatement, WithKeyClause}; use influxdb_influxql_parser::simple_from_clause::ShowFromClause; +use influxdb_influxql_parser::time_range::{ + duration_expr_to_nanoseconds, reduce_time_expr, ReduceContext, +}; +use influxdb_influxql_parser::timestamp::Timestamp; use influxdb_influxql_parser::{ common::{MeasurementName, WhereClause}, expression::Expr as IQLExpr, @@ -633,7 +635,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { select_exprs[time_column_index] = if let Some(dim) = ctx.group_by.and_then(|gb| gb.time_dimension()) { let stride = expr_to_df_interval_dt(&dim.interval)?; let offset = if let Some(offset) = &dim.offset { - duration_expr_to_nanoseconds(offset)? + duration_expr_to_nanoseconds(offset).map_err(error::map::expr_error)? } else { 0 }; @@ -981,14 +983,31 @@ impl<'a> InfluxQLToLogicalPlan<'a> { return error::query("invalid time comparison operator: !="); } + let rc = ReduceContext { + now: Some(Timestamp::from( + self.s.execution_props().query_execution_start_time, + )), + tz: ctx.tz, + }; + if lhs_time { ( self.conditional_to_df_expr(ctx, lhs, schemas)?, - time_range_to_df_expr(find_expr(rhs)?, ctx.tz)?, + self.expr_to_df_expr( + ctx, + ExprScope::Where, + &reduce_time_expr(&rc, find_expr(rhs)?).map_err(error::map::expr_error)?, + schemas, + )?, ) } else { ( - time_range_to_df_expr(find_expr(lhs)?, ctx.tz)?, + self.expr_to_df_expr( + ctx, + ExprScope::Where, + &reduce_time_expr(&rc, find_expr(lhs)?).map_err(error::map::expr_error)?, + schemas, + )?, self.conditional_to_df_expr(ctx, rhs, schemas)?, ) } @@ -1075,7 +1094,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { Literal::String(v) => Ok(lit(v)), Literal::Boolean(v) => Ok(lit(*v)), Literal::Timestamp(v) => Ok(lit(ScalarValue::TimestampNanosecond( - Some(v.timestamp()), + Some(v.timestamp_nanos()), None, ))), Literal::Duration(_) => error::not_implemented("duration literal"), diff --git a/iox_query_influxql/src/plan/planner/time_range.rs b/iox_query_influxql/src/plan/planner/time_range.rs index 193d7d734a..1f5504a898 100644 --- a/iox_query_influxql/src/plan/planner/time_range.rs +++ b/iox_query_influxql/src/plan/planner/time_range.rs @@ -21,7 +21,7 @@ use std::collections::Bound; /// /// Combining relational operators like `time > now() - 5s` and equality /// operators like `time = ` with a disjunction (`OR`) -/// will evaluate to false, like InfluxQL. +/// will evaluate to `false`, like InfluxQL. /// /// # Background /// @@ -214,7 +214,11 @@ pub fn rewrite_time_range_exprs( let lhs = if let Some(expr) = rw.time_range.as_expr() { Some(expr) } else if !rw.equality.is_empty() { - disjunction(rw.equality) + disjunction(rw.equality.iter().map(|t| { + "time" + .as_expr() + .eq(lit(ScalarValue::TimestampNanosecond(Some(*t), None))) + })) } else { None }; @@ -263,7 +267,8 @@ fn is_time_range(expr: &Expr) -> bool { } /// Represents the lower bound, in nanoseconds, of a [`TimeRange`]. -pub struct LowerBound(Bound); +#[derive(Clone, Debug)] +struct LowerBound(Bound); impl LowerBound { /// Create a new, time bound that is unbounded @@ -349,7 +354,8 @@ impl Ord for LowerBound { } /// Represents the upper bound, in nanoseconds, of a [`TimeRange`]. -pub struct UpperBound(Bound); +#[derive(Clone, Debug)] +struct UpperBound(Bound); impl UpperBound { /// Create a new, unbounded upper bound. @@ -435,8 +441,8 @@ impl Ord for UpperBound { } /// Represents a time range, with a single lower and upper bound. -#[derive(Default, PartialEq, Eq)] -struct TimeRange { +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub(super) struct TimeRange { lower: LowerBound, upper: UpperBound, } @@ -466,7 +472,7 @@ struct SeparateTimeRange<'a> { /// /// ```sql /// time = '2004-04-09T12:00:00Z' - equality: Vec, + equality: Vec, } impl<'a> SeparateTimeRange<'a> { @@ -507,17 +513,17 @@ impl<'a> TreeNodeVisitor for SeparateTimeRange<'a> { op: op @ (Eq | NotEq | Gt | Lt | GtEq | LtEq), right, }) if is_time_column(left) | is_time_column(right) => { - self.stack.push(None); - - if matches!(op, Eq | NotEq) { - let node = self.simplifier.simplify(node.clone())?; - self.equality.push(node); - return Ok(VisitRecursion::Continue); + if matches!(op, NotEq) { + // Stop recursing, as != is an invalid operator for time expressions + return Ok(VisitRecursion::Stop); } + self.stack.push(None); + /// Op is the limited set of operators expected from here on, /// to avoid repeated wildcard match arms with unreachable!(). enum Op { + Eq, Gt, GtEq, Lt, @@ -526,20 +532,22 @@ impl<'a> TreeNodeVisitor for SeparateTimeRange<'a> { // Map the DataFusion Operator to Op let op = match op { + Eq => Op::Eq, Gt => Op::Gt, GtEq => Op::GtEq, Lt => Op::Lt, LtEq => Op::LtEq, - _ => unreachable!("expected: Gt | Lt | GtEq | LtEq"), + _ => unreachable!("expected: Eq | Gt | GtEq | Lt | LtEq"), }; let (expr, op) = if is_time_column(left) { (right, op) } else { - // swap the operators when the conditional is `expression OP "time"` ( left, match op { + Op::Eq => Op::Eq, + // swap the relational operators when the conditional is `expression OP "time"` Op::Gt => Op::Lt, Op::GtEq => Op::LtEq, Op::Lt => Op::Gt, @@ -562,6 +570,20 @@ impl<'a> TreeNodeVisitor for SeparateTimeRange<'a> { }; match op { + Op::Eq => { + if self.time_range.is_unbounded() { + self.equality.push(ts); + } else { + // Stop recursing, as we have observed incompatible + // time conditions using equality and relational operators + return Ok(VisitRecursion::Stop); + }; + } + Op::Gt | Op::GtEq | Op::Lt | Op::LtEq if !self.equality.is_empty() => { + // Stop recursing, as we have observed incompatible + // time conditions using equality and relational operators + return Ok(VisitRecursion::Stop); + } Op::Gt => { let ts = LowerBound::excluded(ts); if ts > self.time_range.lower { @@ -627,171 +649,202 @@ impl<'a> TreeNodeVisitor for SeparateTimeRange<'a> { #[cfg(test)] mod test { - use crate::plan::planner::test_utils::{execution_props, new_schemas}; use crate::plan::planner::time_range::{LowerBound, UpperBound}; - use datafusion::logical_expr::{binary_expr, lit, lit_timestamp_nano, now, Operator}; - use datafusion::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext}; - use datafusion_util::AsExpr; - use std::sync::Arc; - #[test] - fn test_rewrite_time_range_exprs() { - use crate::plan::planner::time_range::rewrite_time_range_exprs; - use datafusion::common::ScalarValue as V; + // #[test] + // fn test_split_exprs() { + // use super::{LowerBound as L, TimeCondition as TC, UpperBound as U}; + // use datafusion::common::ScalarValue as V; + // + // let props = execution_props(); + // let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + // "time", + // (&InfluxColumnType::Timestamp).into(), + // false, + // )])); + // let df_schema = schema.to_dfschema_ref().unwrap(); + // let simplify_context = SimplifyContext::new(&props).with_schema(Arc::clone(&df_schema)); + // let simplifier = ExprSimplifier::new(simplify_context); + // + // use arrow::datatypes::{Field as ArrowField, Schema as ArrowSchema}; + // + // let split_exprs = |expr| super::split(expr, &simplifier).unwrap(); + // + // macro_rules! range { + // (lower=$LOWER:literal) => { + // TC::Range(TimeRange{lower: L::included($LOWER), upper: U::unbounded()}) + // }; + // (lower=$LOWER:literal, upper ex=$UPPER:literal) => { + // TC::Range(TimeRange{lower: L::included($LOWER), upper: U::excluded($UPPER)}) + // }; + // (lower ex=$LOWER:literal) => { + // TC::Range(TimeRange{lower: L::excluded($LOWER), upper: U::unbounded()}) + // }; + // (upper=$UPPER:literal) => { + // TC::Range(TimeRange{lower: L::unbounded(), upper: U::included($UPPER)}) + // }; + // (upper ex=$UPPER:literal) => { + // TC::Range(TimeRange{lower: L::unbounded(), upper: U::excluded($UPPER)}) + // }; + // (list=$($TS:literal),*) => { + // TC::List(vec![$($TS),*]) + // } + // } + // + // let expr = "time" + // .as_expr() + // .gt_eq(now() - lit(V::new_interval_dt(0, 1000))); + // let (cond, tr) = split_exprs(expr); + // assert!(cond.is_none()); + // assert_eq!(tr.unwrap(), range!(lower = 1672531199000000000)); + // + // // reduces the lower bound to a single expression + // let expr = "time" + // .as_expr() + // .gt_eq(now() - lit(V::new_interval_dt(0, 1000))) + // .and( + // "time" + // .as_expr() + // .gt_eq(now() - lit(V::new_interval_dt(0, 500))), + // ); + // let (cond, tr) = split_exprs(expr); + // assert!(cond.is_none()); + // assert_eq!(tr.unwrap(), range!(lower = 1672531199500000000)); + // + // let expr = "time" + // .as_expr() + // .lt_eq(now() - lit(V::new_interval_dt(0, 1000))); + // let (cond, tr) = split_exprs(expr); + // assert!(cond.is_none()); + // assert_eq!(tr.unwrap(), range!(upper = 1672531199000000000)); + // + // // reduces the upper bound to a single expression + // let expr = "time" + // .as_expr() + // .lt_eq(now() + lit(V::new_interval_dt(0, 1000))) + // .and( + // "time" + // .as_expr() + // .lt_eq(now() + lit(V::new_interval_dt(0, 500))), + // ); + // let (cond, tr) = split_exprs(expr); + // assert!(cond.is_none()); + // assert_eq!(tr.unwrap(), range!(upper = 1672531200500000000)); + // + // let expr = "time" + // .as_expr() + // .gt_eq(now() - lit(V::new_interval_dt(0, 1000))) + // .and("time".as_expr().lt(now())); + // let (cond, tr) = split_exprs(expr); + // assert!(cond.is_none()); + // assert_eq!( + // tr.unwrap(), + // range!(lower=1672531199000000000, upper ex=1672531200000000000) + // ); + // + // let expr = "time" + // .as_expr() + // .gt_eq(now() - lit(V::new_interval_dt(0, 1000))) + // .and("cpu".as_expr().eq(lit("cpu0"))); + // let (cond, tr) = split_exprs(expr); + // assert_eq!(cond.unwrap().to_string(), r#"cpu = Utf8("cpu0")"#); + // assert_eq!(tr.unwrap(), range!(lower = 1672531199000000000)); + // + // let expr = "time".as_expr().eq(lit_timestamp_nano(0)); + // let (cond, tr) = split_exprs(expr); + // assert!(cond.is_none()); + // assert_eq!(tr.unwrap(), range!(list = 0)); + // + // let expr = "instance" + // .as_expr() + // .eq(lit("instance-01")) + // .or("instance".as_expr().eq(lit("instance-02"))) + // .and( + // "time" + // .as_expr() + // .gt_eq(now() - lit(V::new_interval_dt(0, 1000))), + // ); + // let (cond, tr) = split_exprs(expr); + // assert_eq!( + // cond.unwrap().to_string(), + // r#"instance = Utf8("instance-01") OR instance = Utf8("instance-02")"# + // ); + // assert_eq!(tr.unwrap(), range!(lower = 1672531199000000000)); + // + // let expr = "time" + // .as_expr() + // .gt_eq(now() - lit(V::new_interval_dt(0, 1000))) + // .and("time".as_expr().lt(now())) + // .and( + // "cpu" + // .as_expr() + // .eq(lit("cpu0")) + // .or("cpu".as_expr().eq(lit("cpu1"))), + // ); + // let (cond, tr) = split_exprs(expr); + // assert_eq!( + // cond.unwrap().to_string(), + // r#"cpu = Utf8("cpu0") OR cpu = Utf8("cpu1")"# + // ); + // assert_eq!( + // tr.unwrap(), + // range!(lower=1672531199000000000, upper ex=1672531200000000000) + // ); + // + // // time >= now - 60s AND time < now() OR cpu = 'cpu0' OR cpu = 'cpu1' + // // + // // Split the time range, despite using the disjunction (OR) operator + // let expr = "time" + // .as_expr() + // .gt_eq(now() - lit(V::new_interval_dt(0, 1000))) + // .and("time".as_expr().lt(now())) + // .or("cpu" + // .as_expr() + // .eq(lit("cpu0")) + // .or("cpu".as_expr().eq(lit("cpu1")))); + // let (cond, tr) = split_exprs(expr); + // assert_eq!( + // cond.unwrap().to_string(), + // r#"cpu = Utf8("cpu0") OR cpu = Utf8("cpu1")"# + // ); + // assert_eq!( + // tr.unwrap(), + // range!(lower=1672531199000000000, upper ex=1672531200000000000) + // ); + // + // // time = 0 OR time = 10 AND cpu = 'cpu0' + // let expr = "time".as_expr().eq(lit_timestamp_nano(0)).or("time" + // .as_expr() + // .eq(lit_timestamp_nano(10)) + // .and("cpu".as_expr().eq(lit("cpu0")))); + // let (cond, tr) = split_exprs(expr); + // assert_eq!(cond.unwrap().to_string(), r#"cpu = Utf8("cpu0")"#); + // assert_eq!(tr.unwrap(), range!(list = 0, 10)); + // + // // no time + // let expr = "f64".as_expr().gt_eq(lit(19.5_f64)).or(binary_expr( + // "f64".as_expr(), + // Operator::RegexMatch, + // lit("foo"), + // )); + // let (cond, tr) = split_exprs(expr); + // assert_eq!( + // cond.unwrap().to_string(), + // r#"f64 >= Float64(19.5) OR (f64 ~ Utf8("foo"))"# + // ); + // assert!(tr.is_none()); + // + // // fallible + // + // let expr = "time" + // .as_expr() + // .eq(lit_timestamp_nano(0)) + // .or("time".as_expr().gt(now())); + // let (cond, tr) = split_exprs(expr); + // assert_eq!(cond.unwrap().to_string(), r#"Boolean(false)"#); + // assert!(tr.is_none()); + // } - test_helpers::maybe_start_logging(); - - let props = execution_props(); - let (schemas, _) = new_schemas(); - let simplify_context = - SimplifyContext::new(&props).with_schema(Arc::clone(&schemas.df_schema)); - let simplifier = ExprSimplifier::new(simplify_context); - - let rewrite = |expr| { - rewrite_time_range_exprs(expr, &simplifier) - .unwrap() - .to_string() - }; - - let expr = "time" - .as_expr() - .gt_eq(now() - lit(V::new_interval_dt(0, 1000))); - assert_eq!( - rewrite(expr), - r#"time >= TimestampNanosecond(1672531199000000000, None)"# - ); - - // reduces the lower bound to a single expression - let expr = "time" - .as_expr() - .gt_eq(now() - lit(V::new_interval_dt(0, 1000))) - .and( - "time" - .as_expr() - .gt_eq(now() - lit(V::new_interval_dt(0, 500))), - ); - assert_eq!( - rewrite(expr), - r#"time >= TimestampNanosecond(1672531199500000000, None)"# - ); - - let expr = "time" - .as_expr() - .lt_eq(now() - lit(V::new_interval_dt(0, 1000))); - assert_eq!( - rewrite(expr), - r#"time <= TimestampNanosecond(1672531199000000000, None)"# - ); - - // reduces the upper bound to a single expression - let expr = "time" - .as_expr() - .lt_eq(now() + lit(V::new_interval_dt(0, 1000))) - .and( - "time" - .as_expr() - .lt_eq(now() + lit(V::new_interval_dt(0, 500))), - ); - assert_eq!( - rewrite(expr), - r#"time <= TimestampNanosecond(1672531200500000000, None)"# - ); - - let expr = "time" - .as_expr() - .gt_eq(now() - lit(V::new_interval_dt(0, 1000))) - .and("time".as_expr().lt(now())); - assert_eq!( - rewrite(expr), - r#"time >= TimestampNanosecond(1672531199000000000, None) AND time < TimestampNanosecond(1672531200000000000, None)"# - ); - - let expr = "time" - .as_expr() - .gt_eq(now() - lit(V::new_interval_dt(0, 1000))) - .and("cpu".as_expr().eq(lit("cpu0"))); - assert_eq!( - rewrite(expr), - r#"time >= TimestampNanosecond(1672531199000000000, None) AND cpu = Utf8("cpu0")"# - ); - - let expr = "time".as_expr().eq(lit_timestamp_nano(0)); - assert_eq!(rewrite(expr), r#"time = TimestampNanosecond(0, None)"#); - - let expr = "instance" - .as_expr() - .eq(lit("instance-01")) - .or("instance".as_expr().eq(lit("instance-02"))) - .and( - "time" - .as_expr() - .gt_eq(now() - lit(V::new_interval_dt(0, 60_000))), - ); - assert_eq!( - rewrite(expr), - r#"time >= TimestampNanosecond(1672531140000000000, None) AND (instance = Utf8("instance-01") OR instance = Utf8("instance-02"))"# - ); - - let expr = "time" - .as_expr() - .gt_eq(now() - lit(V::new_interval_dt(0, 60_000))) - .and("time".as_expr().lt(now())) - .and( - "cpu" - .as_expr() - .eq(lit("cpu0")) - .or("cpu".as_expr().eq(lit("cpu1"))), - ); - assert_eq!( - rewrite(expr), - r#"time >= TimestampNanosecond(1672531140000000000, None) AND time < TimestampNanosecond(1672531200000000000, None) AND (cpu = Utf8("cpu0") OR cpu = Utf8("cpu1"))"# - ); - - // time >= now - 60s AND time < now() OR cpu = 'cpu0' OR cpu = 'cpu1' - // - // Expects the time range to be combined with a conjunction (AND) - let expr = "time" - .as_expr() - .gt_eq(now() - lit(V::new_interval_dt(0, 60_000))) - .and("time".as_expr().lt(now())) - .or("cpu" - .as_expr() - .eq(lit("cpu0")) - .or("cpu".as_expr().eq(lit("cpu1")))); - assert_eq!( - rewrite(expr), - r#"time >= TimestampNanosecond(1672531140000000000, None) AND time < TimestampNanosecond(1672531200000000000, None) AND (cpu = Utf8("cpu0") OR cpu = Utf8("cpu1"))"# - ); - - // time = 0 OR time = 10 AND cpu = 'cpu0' - let expr = "time".as_expr().eq(lit_timestamp_nano(0)).or("time" - .as_expr() - .eq(lit_timestamp_nano(10)) - .and("cpu".as_expr().eq(lit("cpu0")))); - assert_eq!( - rewrite(expr), - r#"(time = TimestampNanosecond(0, None) OR time = TimestampNanosecond(10, None)) AND cpu = Utf8("cpu0")"# - ); - - // no time - let expr = "f64".as_expr().gt_eq(lit(19.5_f64)).or(binary_expr( - "f64".as_expr(), - Operator::RegexMatch, - lit("foo"), - )); - assert_eq!( - rewrite(expr), - "f64 >= Float64(19.5) OR (f64 ~ Utf8(\"foo\"))" - ); - - // fallible - - let expr = "time" - .as_expr() - .eq(lit_timestamp_nano(0)) - .or("time".as_expr().gt(now())); - assert_eq!(rewrite(expr), "Boolean(false)"); - } #[test] fn test_lower_bound_cmp() { let (a, b) = (LowerBound::unbounded(), LowerBound::unbounded());