Merge branch 'main' into old/persist-completion-observer

pull/24376/head
Dom 2023-01-18 09:33:55 +00:00 committed by GitHub
commit dbaa814a8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 891 additions and 323 deletions

4
Cargo.lock generated
View File

@ -5779,9 +5779,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.24.1"
version = "1.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d9f76183f91ecfb55e1d7d5602bd1d979e38a3a522fe900241cf195624d67ae"
checksum = "597a12a59981d9e3c38d216785b0c37399f6e415e8d0712047620f189371b0bb"
dependencies = [
"autocfg",
"bytes",

View File

@ -8,7 +8,7 @@ license.workspace = true
[dependencies] # In alphabetical order
nom = { version = "7", default-features = false, features = ["std"] }
once_cell = "1"
chrono = { version = "0.4", default-features = false }
chrono = { version = "0.4", default-features = false, features = ["std"] }
chrono-tz = { version = "0.8" }
workspace-hack = { path = "../workspace-hack" }

View File

@ -847,7 +847,7 @@ mod tests {
assert_eq!(got.len(), 1);
assert_eq!(got.head(), "foo");
assert_eq!(*got, vec!["foo"]); // deref
assert_eq!(format!("{}", got), "foo");
assert_eq!(got.to_string(), "foo");
let (_, got) =
OneOrMoreString::separated_list1("Expects one or more")("foo , bar,foobar").unwrap();
@ -855,7 +855,7 @@ mod tests {
assert_eq!(got.head(), "foo");
assert_eq!(got.tail(), vec!["bar", "foobar"]);
assert_eq!(*got, vec!["foo", "bar", "foobar"]); // deref
assert_eq!(format!("{}", got), "foo, bar, foobar");
assert_eq!(got.to_string(), "foo, bar, foobar");
// Fallible cases
@ -889,7 +889,7 @@ mod tests {
assert_eq!(got.len(), 1);
assert_eq!(got.head().unwrap(), "foo");
assert_eq!(*got, vec!["foo"]); // deref
assert_eq!(format!("{}", got), "foo");
assert_eq!(got.to_string(), "foo");
let (_, got) =
ZeroOrMoreString::separated_list1("Expects one or more")("foo , bar,foobar").unwrap();
@ -897,7 +897,7 @@ mod tests {
assert_eq!(got.head().unwrap(), "foo");
assert_eq!(got.tail(), vec!["bar", "foobar"]);
assert_eq!(*got, vec!["foo", "bar", "foobar"]); // deref
assert_eq!(format!("{}", got), "foo, bar, foobar");
assert_eq!(got.to_string(), "foo, bar, foobar");
// should not panic
let got = ZeroOrMoreString::new(vec![]);

View File

@ -176,7 +176,7 @@ mod test {
let (rem, got) = create_database("DATABASE telegraf WITH DURATION 5m").unwrap();
assert_eq!(rem, "");
assert_eq!(got.name, "telegraf".into());
assert_eq!(format!("{}", got.duration.unwrap()), "5m");
assert_eq!(got.duration.unwrap().to_string(), "5m");
let (rem, got) = create_database("DATABASE telegraf WITH REPLICATION 10").unwrap();
assert_eq!(rem, "");
@ -186,7 +186,7 @@ mod test {
let (rem, got) = create_database("DATABASE telegraf WITH SHARD DURATION 6m").unwrap();
assert_eq!(rem, "");
assert_eq!(got.name, "telegraf".into());
assert_eq!(format!("{}", got.shard_duration.unwrap()), "6m");
assert_eq!(got.shard_duration.unwrap().to_string(), "6m");
let (rem, got) = create_database("DATABASE telegraf WITH NAME \"5 minutes\"").unwrap();
assert_eq!(rem, "");
@ -196,9 +196,9 @@ mod test {
let (rem, got) = create_database("DATABASE telegraf WITH DURATION 5m REPLICATION 10 SHARD DURATION 6m NAME \"5 minutes\"").unwrap();
assert_eq!(rem, "");
assert_eq!(got.name, "telegraf".into());
assert_eq!(format!("{}", got.duration.unwrap()), "5m");
assert_eq!(got.duration.unwrap().to_string(), "5m");
assert_eq!(got.replication.unwrap(), 10);
assert_eq!(format!("{}", got.shard_duration.unwrap()), "6m");
assert_eq!(got.shard_duration.unwrap().to_string(), "6m");
assert_eq!(got.retention_name.unwrap(), "5 minutes".into());
// Fallible

View File

@ -81,17 +81,17 @@ mod test {
// Measurement name expressed as an identifier
let (_, got) = delete_statement("DELETE FROM foo").unwrap();
assert_eq!(format!("{}", got), "DELETE FROM foo");
assert_eq!(got.to_string(), "DELETE FROM foo");
// Measurement name expressed as a regular expression
let (_, got) = delete_statement("DELETE FROM /foo/").unwrap();
assert_eq!(format!("{}", got), "DELETE FROM /foo/");
assert_eq!(got.to_string(), "DELETE FROM /foo/");
let (_, got) = delete_statement("DELETE FROM foo WHERE time > 10").unwrap();
assert_eq!(format!("{}", got), "DELETE FROM foo WHERE time > 10");
assert_eq!(got.to_string(), "DELETE FROM foo WHERE time > 10");
let (_, got) = delete_statement("DELETE WHERE time > 10").unwrap();
assert_eq!(format!("{}", got), "DELETE WHERE time > 10");
assert_eq!(got.to_string(), "DELETE WHERE time > 10");
// Fallible cases
assert_expect_error!(

View File

@ -67,7 +67,7 @@ mod test {
let (_, got) = drop_measurement("MEASUREMENT \"foo\"").unwrap();
assert_eq!(got, DropMeasurementStatement { name: "foo".into() });
// validate Display
assert_eq!(format!("{}", got), "DROP MEASUREMENT foo");
assert_eq!(got.to_string(), "DROP MEASUREMENT foo");
// Fallible cases
assert_expect_error!(

View File

@ -102,24 +102,24 @@ mod test {
let (remain, got) = explain_statement("EXPLAIN SELECT val from temp").unwrap();
assert_eq!(remain, ""); // assert that all input was consumed
assert_matches!(got.options, None);
assert_eq!(format!("{}", got), "EXPLAIN SELECT val FROM temp");
assert_eq!(got.to_string(), "EXPLAIN SELECT val FROM temp");
let (remain, got) = explain_statement("EXPLAIN VERBOSE SELECT val from temp").unwrap();
assert_eq!(remain, "");
assert_matches!(&got.options, Some(o) if *o == ExplainOption::Verbose);
assert_eq!(format!("{}", got), "EXPLAIN VERBOSE SELECT val FROM temp");
assert_eq!(got.to_string(), "EXPLAIN VERBOSE SELECT val FROM temp");
let (remain, got) = explain_statement("EXPLAIN ANALYZE SELECT val from temp").unwrap();
assert_eq!(remain, "");
assert_matches!(&got.options, Some(o) if *o == ExplainOption::Analyze);
assert_eq!(format!("{}", got), "EXPLAIN ANALYZE SELECT val FROM temp");
assert_eq!(got.to_string(), "EXPLAIN ANALYZE SELECT val FROM temp");
let (remain, got) =
explain_statement("EXPLAIN ANALYZE VERBOSE SELECT val from temp").unwrap();
assert_eq!(remain, "");
assert_matches!(&got.options, Some(o) if *o == ExplainOption::AnalyzeVerbose);
assert_eq!(
format!("{}", got),
got.to_string(),
"EXPLAIN ANALYZE VERBOSE SELECT val FROM temp"
);

View File

@ -1,6 +1,6 @@
use crate::common::ws0;
use crate::identifier::unquoted_identifier;
use crate::internal::{expect, ParseResult};
use crate::internal::{expect, Error, ParseError, ParseResult};
use crate::keywords::keyword;
use crate::literal::literal_regex;
use crate::{
@ -15,6 +15,7 @@ use nom::combinator::{cut, map, opt, value};
use nom::multi::{many0, separated_list0};
use nom::sequence::{delimited, pair, preceded, separated_pair, terminated, tuple};
use std::fmt::{Display, Formatter, Write};
use std::ops::Neg;
/// An InfluxQL arithmetic expression.
#[derive(Clone, Debug, PartialEq)]
@ -66,9 +67,6 @@ pub enum Expr {
/// A DISTINCT `<identifier>` expression.
Distinct(Identifier),
/// Unary operation such as + 5 or - 1h3m
UnaryOp(UnaryOperator, Box<Expr>),
/// Function call
Call {
/// Represents the name of the function call.
@ -98,6 +96,12 @@ impl From<Literal> for Expr {
}
}
impl From<i64> for Expr {
fn from(v: i64) -> Self {
Self::Literal(v.into())
}
}
impl From<u64> for Expr {
fn from(v: u64) -> Self {
Self::Literal(v.into())
@ -116,6 +120,18 @@ impl From<u64> for Box<Expr> {
}
}
impl From<i64> for Box<Expr> {
fn from(v: i64) -> Self {
Self::new(v.into())
}
}
impl From<i32> for Box<Expr> {
fn from(v: i32) -> Self {
Self::new((v as i64).into())
}
}
impl Display for Expr {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
@ -127,7 +143,6 @@ impl Display for Expr {
}
Self::BindParameter(v) => write!(f, "{}", v)?,
Self::Literal(v) => write!(f, "{}", v)?,
Self::UnaryOp(op, e) => write!(f, "{}{}", op, e)?,
Self::Binary { lhs, op, rhs } => write!(f, "{} {} {}", lhs, op, rhs)?,
Self::Nested(e) => write!(f, "({})", e)?,
Self::Call { name, args } => {
@ -291,7 +306,46 @@ where
let (i, e) = factor::<T>(i)?;
Ok((i, Expr::UnaryOp(op, e.into())))
// Unary minus is expressed by negating existing literals,
// or producing a binary arithmetic expression that multiplies
// Expr `e` by -1
let e = if op == UnaryOperator::Minus {
match e {
Expr::Literal(Literal::Float(v)) => Expr::Literal(Literal::Float(v.neg())),
Expr::Literal(Literal::Integer(v)) => Expr::Literal(Literal::Integer(v.neg())),
Expr::Literal(Literal::Duration(v)) => Expr::Literal(Literal::Duration((v.0.neg()).into())),
Expr::Literal(Literal::Unsigned(v)) => {
if v == (i64::MAX as u64) + 1 {
// The minimum i64 is parsed as a Literal::Unsigned, as it exceeds
// int64::MAX, so we explicitly handle that case per
// https://github.com/influxdata/influxql/blob/7e7d61973256ffeef4b99edd0a89f18a9e52fa2d/parser.go#L2750-L2755
Expr::Literal(Literal::Integer(i64::MIN))
} else {
return Err(nom::Err::Failure(Error::from_message(
i,
"constant overflows signed integer",
)));
}
},
v @ Expr::VarRef { .. } | v @ Expr::Call { .. } | v @ Expr::Nested(..) | v @ Expr::BindParameter(..) => {
Expr::Binary {
lhs: Box::new(Expr::Literal(Literal::Integer(-1))),
op: BinaryOperator::Mul,
rhs: Box::new(v),
}
}
_ => {
return Err(nom::Err::Failure(Error::from_message(
i,
"unexpected unary expression: expected literal integer, float, duration, field, function or parenthesis",
)))
}
}
} else {
e
};
Ok((i, e))
}
/// Parse a parenthesis expression.
@ -478,7 +532,7 @@ mod test {
use super::*;
use crate::literal::literal_no_regex;
use crate::parameter::parameter;
use crate::{assert_expect_error, assert_failure, binary_op, nested, param, unary, var_ref};
use crate::{assert_expect_error, assert_failure, binary_op, nested, param, var_ref};
struct TestParsers;
@ -511,33 +565,25 @@ mod test {
// are nested deeper in the AST.
let (_, got) = arithmetic_expression("5 % -3 | 2").unwrap();
assert_eq!(
got,
binary_op!(binary_op!(5, Mod, unary!(-3)), BitwiseOr, 2)
);
assert_eq!(got, binary_op!(binary_op!(5, Mod, -3), BitwiseOr, 2));
let (_, got) = arithmetic_expression("-3 | 2 % 5").unwrap();
assert_eq!(
got,
binary_op!(unary!(-3), BitwiseOr, binary_op!(2, Mod, 5))
);
assert_eq!(got, binary_op!(-3, BitwiseOr, binary_op!(2, Mod, 5)));
let (_, got) = arithmetic_expression("5 % 2 | -3").unwrap();
assert_eq!(
got,
binary_op!(binary_op!(5, Mod, 2), BitwiseOr, unary!(-3))
);
assert_eq!(got, binary_op!(binary_op!(5, Mod, 2), BitwiseOr, -3));
let (_, got) = arithmetic_expression("2 | -3 % 5").unwrap();
assert_eq!(
got,
binary_op!(2, BitwiseOr, binary_op!(unary!(-3), Mod, 5))
);
assert_eq!(got, binary_op!(2, BitwiseOr, binary_op!(-3, Mod, 5)));
let (_, got) = arithmetic_expression("5 - -(3 | 2)").unwrap();
assert_eq!(
got,
binary_op!(5, Sub, unary!(-nested!(binary_op!(3, BitwiseOr, 2))))
binary_op!(
5,
Sub,
binary_op!(-1, Mul, nested!(binary_op!(3, BitwiseOr, 2)))
)
);
let (_, got) = arithmetic_expression("2 | 5 % 3").unwrap();
@ -554,22 +600,35 @@ mod test {
let (_, got) = arithmetic_expression("5- -(3|2)").unwrap();
assert_eq!(
got,
binary_op!(5, Sub, unary!(-nested!(binary_op!(3, BitwiseOr, 2))))
binary_op!(
5,
Sub,
binary_op!(-1, Mul, nested!(binary_op!(3, BitwiseOr, 2)))
)
);
// whitespace is not significant between unary operators
let (_, got) = arithmetic_expression("5+-(3|2)").unwrap();
assert_eq!(
got,
binary_op!(5, Add, unary!(-nested!(binary_op!(3, BitwiseOr, 2))))
binary_op!(
5,
Add,
binary_op!(-1, Mul, nested!(binary_op!(3, BitwiseOr, 2)))
)
);
// Test unary max signed
let (_, got) = arithmetic_expression("-9223372036854775808").unwrap();
assert_eq!(got, Expr::Literal(Literal::Integer(-9223372036854775808)));
// Fallible cases
// invalid operator / incomplete expression
assert_failure!(arithmetic_expression("5 || 3"));
// TODO: skip until https://github.com/influxdata/influxdb_iox/issues/5663 is implemented
// assert_failure!(arithmetic("5+--(3|2)"));
assert_failure!(arithmetic_expression("5+--(3|2)"));
// exceeds i64::MIN
assert_failure!(arithmetic_expression("-9223372036854775809"));
}
#[test]
@ -587,7 +646,7 @@ mod test {
// * https://github.com/influxdata/influxql/blob/7e7d61973256ffeef4b99edd0a89f18a9e52fa2d/parser.go#L2551
let (rem, got) = var_ref("db.rp.foo").unwrap();
assert_eq!(got, var_ref!("db.rp.foo"));
assert_eq!(format!("{}", got), r#""db.rp.foo""#);
assert_eq!(got.to_string(), r#""db.rp.foo""#);
assert_eq!(rem, "");
// with cast operator
@ -619,109 +678,107 @@ mod test {
// Unquoted
let (rem, id) = segmented_identifier("part0").unwrap();
assert_eq!(rem, "");
assert_eq!(format!("{}", id), "part0");
assert_eq!(id.to_string(), "part0");
// id.id
let (rem, id) = segmented_identifier("part1.part0").unwrap();
assert_eq!(rem, "");
assert_eq!(format!("{}", id), "\"part1.part0\"");
assert_eq!(id.to_string(), "\"part1.part0\"");
// id..id
let (rem, id) = segmented_identifier("part2..part0").unwrap();
assert_eq!(rem, "");
assert_eq!(format!("{}", id), "\"part2..part0\"");
assert_eq!(id.to_string(), "\"part2..part0\"");
// id.id.id
let (rem, id) = segmented_identifier("part2.part1.part0").unwrap();
assert_eq!(rem, "");
assert_eq!(format!("{}", id), "\"part2.part1.part0\"");
assert_eq!(id.to_string(), "\"part2.part1.part0\"");
// "id"."id".id
let (rem, id) = segmented_identifier(r#""part 2"."part 1".part0"#).unwrap();
assert_eq!(rem, "");
assert_eq!(format!("{}", id), "\"part 2.part 1.part0\"");
assert_eq!(id.to_string(), "\"part 2.part 1.part0\"");
// Only parses 3 segments
let (rem, id) = segmented_identifier("part2.part1.part0.foo").unwrap();
assert_eq!(rem, ".foo");
assert_eq!(format!("{}", id), "\"part2.part1.part0\"");
assert_eq!(id.to_string(), "\"part2.part1.part0\"");
// Quoted
let (rem, id) = segmented_identifier("\"part0\"").unwrap();
assert_eq!(rem, "");
assert_eq!(format!("{}", id), "part0");
assert_eq!(id.to_string(), "part0");
// Additional test cases, with compatibility proven via https://go.dev/play/p/k2150CJocVl
let (rem, id) = segmented_identifier(r#""part" 2"."part 1".part0"#).unwrap();
assert_eq!(rem, r#" 2"."part 1".part0"#);
assert_eq!(format!("{}", id), "part");
assert_eq!(id.to_string(), "part");
let (rem, id) = segmented_identifier(r#""part" 2."part 1".part0"#).unwrap();
assert_eq!(rem, r#" 2."part 1".part0"#);
assert_eq!(format!("{}", id), "part");
assert_eq!(id.to_string(), "part");
let (rem, id) = segmented_identifier(r#""part "2"."part 1".part0"#).unwrap();
assert_eq!(rem, r#"2"."part 1".part0"#);
assert_eq!(format!("{}", id), r#""part ""#);
assert_eq!(id.to_string(), r#""part ""#);
let (rem, id) = segmented_identifier(r#""part ""2"."part 1".part0"#).unwrap();
assert_eq!(rem, r#""2"."part 1".part0"#);
assert_eq!(format!("{}", id), r#""part ""#);
assert_eq!(id.to_string(), r#""part ""#);
}
#[test]
fn test_display_expr() {
let (_, e) = arithmetic_expression("5 + 51").unwrap();
let got = format!("{}", e);
assert_eq!(got, "5 + 51");
#[track_caller]
fn assert_display_expr(input: &str, expected: &str) {
let (_, e) = arithmetic_expression(input).unwrap();
assert_eq!(e.to_string(), expected);
}
let (_, e) = arithmetic_expression("5 + -10").unwrap();
let got = format!("{}", e);
assert_eq!(got, "5 + -10");
let (_, e) = arithmetic_expression("-(5 % 6)").unwrap();
let got = format!("{}", e);
assert_eq!(got, "-(5 % 6)");
assert_display_expr("5 + 51", "5 + 51");
assert_display_expr("5 + -10", "5 + -10");
assert_display_expr("-(5 % 6)", "-1 * (5 % 6)");
// vary spacing
let (_, e) = arithmetic_expression("( 5 + 6 ) * -( 7+ 8)").unwrap();
let got = format!("{}", e);
assert_eq!(got, "(5 + 6) * -(7 + 8)");
assert_display_expr("( 5 + 6 ) * -( 7+ 8)", "(5 + 6) * -1 * (7 + 8)");
// multiple unary and parenthesis
let (_, e) = arithmetic_expression("(-(5 + 6) & -+( 7 + 8 ))").unwrap();
let got = format!("{}", e);
assert_eq!(got, "(-(5 + 6) & -+(7 + 8))");
assert_display_expr("(-(5 + 6) & -+( 7 + 8 ))", "(-1 * (5 + 6) & -1 * (7 + 8))");
// unquoted identifier
let (_, e) = arithmetic_expression("foo + 5").unwrap();
let got = format!("{}", e);
assert_eq!(got, "foo + 5");
assert_display_expr("foo + 5", "foo + 5");
// identifier, negated
assert_display_expr("-foo + 5", "-1 * foo + 5");
// bind parameter identifier
let (_, e) = arithmetic_expression("foo + $0").unwrap();
let got = format!("{}", e);
assert_eq!(got, "foo + $0");
assert_display_expr("foo + $0", "foo + $0");
// quoted identifier
let (_, e) = arithmetic_expression(r#""foo" + 'bar'"#).unwrap();
let got = format!("{}", e);
assert_eq!(got, r#"foo + 'bar'"#);
assert_display_expr(r#""foo" + 'bar'"#, r#"foo + 'bar'"#);
// quoted identifier, negated
assert_display_expr(r#"-"foo" + 'bar'"#, r#"-1 * foo + 'bar'"#);
// quoted identifier with spaces, negated
assert_display_expr(r#"-"foo bar" + 'bar'"#, r#"-1 * "foo bar" + 'bar'"#);
// Duration
let (_, e) = arithmetic_expression("- 6h30m").unwrap();
let got = format!("{}", e);
assert_eq!(got, "-6h30m");
assert_display_expr("6h30m", "6h30m");
// Negated
assert_display_expr("- 6h30m", "-6h30m");
// Validate other expression types
assert_eq!(format!("{}", Expr::Wildcard(None)), "*");
assert_eq!(Expr::Wildcard(None).to_string(), "*");
assert_eq!(
format!("{}", Expr::Wildcard(Some(WildcardType::Field))),
Expr::Wildcard(Some(WildcardType::Field)).to_string(),
"*::field"
);
assert_eq!(format!("{}", Expr::Distinct("foo".into())), "DISTINCT foo");
assert_eq!(Expr::Distinct("foo".into()).to_string(), "DISTINCT foo");
// can't parse literal regular expressions as part of an arithmetic expression
assert_failure!(arithmetic_expression(r#""foo" + /^(no|match)$/"#));
@ -734,34 +791,30 @@ mod test {
#[test]
fn test_call() {
#[track_caller]
fn assert_call(input: &str, expected: &str) {
let (_, ex) = call(input).unwrap();
assert_eq!(ex.to_string(), expected);
}
// These tests validate a `Call` expression and also it's Display implementation.
// We don't need to validate Expr trees, as we do that in the conditional and arithmetic
// tests.
// No arguments
let (_, ex) = call("FN()").unwrap();
let got = format!("{}", ex);
assert_eq!(got, "FN()");
assert_call("FN()", "FN()");
// Single argument with surrounding whitespace
let (_, ex) = call("FN ( 1 )").unwrap();
let got = format!("{}", ex);
assert_eq!(got, "FN(1)");
assert_call("FN ( 1 )", "FN(1)");
// Multiple arguments with varying whitespace
let (_, ex) = call("FN ( 1,2\n,3,\t4 )").unwrap();
let got = format!("{}", ex);
assert_eq!(got, "FN(1, 2, 3, 4)");
assert_call("FN ( 1,2\n,3,\t4 )", "FN(1, 2, 3, 4)");
// Arguments as expressions
let (_, ex) = call("FN ( 1 + 2, foo, 'bar' )").unwrap();
let got = format!("{}", ex);
assert_eq!(got, "FN(1 + 2, foo, 'bar')");
assert_call("FN ( 1 + 2, foo, 'bar' )", "FN(1 + 2, foo, 'bar')");
// A single regular expression argument
let (_, ex) = call("FN ( /foo/ )").unwrap();
let got = format!("{}", ex);
assert_eq!(got, "FN(/foo/)");
assert_call("FN ( /foo/ )", "FN(/foo/)");
// Fallible cases
@ -779,23 +832,19 @@ mod test {
#[test]
fn test_var_ref_display() {
assert_eq!(
format!(
"{}",
Expr::VarRef {
name: "foo".into(),
data_type: None
}
),
Expr::VarRef {
name: "foo".into(),
data_type: None
}
.to_string(),
"foo"
);
assert_eq!(
format!(
"{}",
Expr::VarRef {
name: "foo".into(),
data_type: Some(VarRefDataType::Field)
}
),
Expr::VarRef {
name: "foo".into(),
data_type: Some(VarRefDataType::Field)
}
.to_string(),
"foo::field"
);
}

View File

@ -242,8 +242,7 @@ mod test {
use super::*;
use crate::expression::arithmetic::Expr;
use crate::{
assert_expect_error, assert_failure, binary_op, call, cond_op, grouped, regex, unary,
var_ref,
assert_expect_error, assert_failure, binary_op, call, cond_op, grouped, regex, var_ref,
};
impl From<Expr> for ConditionalExpression {
@ -252,6 +251,22 @@ mod test {
}
}
impl From<i32> for Box<ConditionalExpression> {
fn from(v: i32) -> Self {
Self::new(ConditionalExpression::Expr(Box::new(Expr::Literal(
(v as i64).into(),
))))
}
}
impl From<i64> for Box<ConditionalExpression> {
fn from(v: i64) -> Self {
Self::new(ConditionalExpression::Expr(Box::new(Expr::Literal(
v.into(),
))))
}
}
impl From<u64> for Box<ConditionalExpression> {
fn from(v: u64) -> Self {
Self::new(ConditionalExpression::Expr(Box::new(Expr::Literal(
@ -315,7 +330,7 @@ mod test {
assert_eq!(got, *cond_op!(var_ref!("foo"), Gt, binary_op!(5, Add, 6)));
let (_, got) = conditional_expression("5 <= -6").unwrap();
assert_eq!(got, *cond_op!(5, LtEq, unary!(-6)));
assert_eq!(got, *cond_op!(5, LtEq, -6));
// simple expressions
let (_, got) = conditional_expression("true").unwrap();
@ -330,7 +345,7 @@ mod test {
assert_eq!(got, *cond_op!(var_ref!("foo"), Gt, binary_op!(5, Add, 6)));
let (_, got) = conditional_expression("5<=-6").unwrap();
assert_eq!(got, *cond_op!(5, LtEq, unary!(-6)));
assert_eq!(got, *cond_op!(5, LtEq, -6));
// var refs with cast operator
let (_, got) = conditional_expression("foo::integer = 5").unwrap();
@ -422,7 +437,6 @@ mod test {
#[test]
fn test_display_expr() {
let (_, e) = conditional_expression("foo = 'test'").unwrap();
let got = format!("{}", e);
assert_eq!(got, "foo = 'test'");
assert_eq!(e.to_string(), "foo = 'test'");
}
}

View File

@ -2,6 +2,6 @@
source: influxdb_influxql_parser/src/expression/walk.rs
expression: "walk_expr(\"5 + 6\")"
---
0: Literal(Unsigned(5))
1: Literal(Unsigned(6))
2: Binary { lhs: Literal(Unsigned(5)), op: Add, rhs: Literal(Unsigned(6)) }
0: Literal(Integer(5))
1: Literal(Integer(6))
2: Binary { lhs: Literal(Integer(5)), op: Add, rhs: Literal(Integer(6)) }

View File

@ -2,6 +2,6 @@
source: influxdb_influxql_parser/src/expression/walk.rs
expression: "walk_expr_mut(\"5 + 6\")"
---
0: Literal(Unsigned(5))
1: Literal(Unsigned(6))
2: Binary { lhs: Literal(Unsigned(5)), op: Add, rhs: Literal(Unsigned(6)) }
0: Literal(Integer(5))
1: Literal(Integer(6))
2: Binary { lhs: Literal(Integer(5)), op: Add, rhs: Literal(Integer(6)) }

View File

@ -2,12 +2,12 @@
source: influxdb_influxql_parser/src/expression/walk.rs
expression: "walk_expression(\"5 + 6 = 2 + 9\")"
---
0: Arithmetic(Literal(Unsigned(5)))
1: Arithmetic(Literal(Unsigned(6)))
2: Arithmetic(Binary { lhs: Literal(Unsigned(5)), op: Add, rhs: Literal(Unsigned(6)) })
3: Conditional(Expr(Binary { lhs: Literal(Unsigned(5)), op: Add, rhs: Literal(Unsigned(6)) }))
4: Arithmetic(Literal(Unsigned(2)))
5: Arithmetic(Literal(Unsigned(9)))
6: Arithmetic(Binary { lhs: Literal(Unsigned(2)), op: Add, rhs: Literal(Unsigned(9)) })
7: Conditional(Expr(Binary { lhs: Literal(Unsigned(2)), op: Add, rhs: Literal(Unsigned(9)) }))
8: Conditional(Binary { lhs: Expr(Binary { lhs: Literal(Unsigned(5)), op: Add, rhs: Literal(Unsigned(6)) }), op: Eq, rhs: Expr(Binary { lhs: Literal(Unsigned(2)), op: Add, rhs: Literal(Unsigned(9)) }) })
0: Arithmetic(Literal(Integer(5)))
1: Arithmetic(Literal(Integer(6)))
2: Arithmetic(Binary { lhs: Literal(Integer(5)), op: Add, rhs: Literal(Integer(6)) })
3: Conditional(Expr(Binary { lhs: Literal(Integer(5)), op: Add, rhs: Literal(Integer(6)) }))
4: Arithmetic(Literal(Integer(2)))
5: Arithmetic(Literal(Integer(9)))
6: Arithmetic(Binary { lhs: Literal(Integer(2)), op: Add, rhs: Literal(Integer(9)) })
7: Conditional(Expr(Binary { lhs: Literal(Integer(2)), op: Add, rhs: Literal(Integer(9)) }))
8: Conditional(Binary { lhs: Expr(Binary { lhs: Literal(Integer(5)), op: Add, rhs: Literal(Integer(6)) }), op: Eq, rhs: Expr(Binary { lhs: Literal(Integer(2)), op: Add, rhs: Literal(Integer(9)) }) })

View File

@ -75,23 +75,6 @@ macro_rules! call {
};
}
/// Constructs a [crate::expression::arithmetic::Expr::UnaryOp] expression.
#[macro_export]
macro_rules! unary {
(- $EXPR:expr) => {
$crate::expression::arithmetic::Expr::UnaryOp(
$crate::expression::arithmetic::UnaryOperator::Minus,
$EXPR.into(),
)
};
(+ $EXPR:expr) => {
$crate::expression::arithmetic::Expr::UnaryOp(
$crate::expression::arithmetic::UnaryOperator::Plus,
$EXPR.into(),
)
};
}
/// Constructs a [crate::expression::arithmetic::Expr::Distinct] expression.
#[macro_export]
macro_rules! distinct {

View File

@ -66,7 +66,6 @@ pub fn walk_expr<B>(
walk_expr(lhs, visit)?;
walk_expr(rhs, visit)?;
}
Expr::UnaryOp(_, n) => walk_expr(n, visit)?,
Expr::Nested(n) => walk_expr(n, visit)?,
Expr::Call { args, .. } => {
args.iter().try_for_each(|n| walk_expr(n, visit))?;
@ -91,7 +90,6 @@ pub fn walk_expr_mut<B>(
walk_expr_mut(lhs, visit)?;
walk_expr_mut(rhs, visit)?;
}
Expr::UnaryOp(_, n) => walk_expr_mut(n, visit)?,
Expr::Nested(n) => walk_expr_mut(n, visit)?,
Expr::Call { args, .. } => {
args.iter_mut().try_for_each(|n| walk_expr_mut(n, visit))?;
@ -140,7 +138,7 @@ mod test {
match e {
ExpressionMut::Arithmetic(n) => match n {
Expr::VarRef { name, .. } => *name = format!("c_{}", name).into(),
Expr::Literal(Literal::Unsigned(v)) => *v *= 10,
Expr::Literal(Literal::Integer(v)) => *v *= 10,
Expr::Literal(Literal::Regex(v)) => *v = format!("c_{}", v.0).into(),
_ => {}
},
@ -152,7 +150,7 @@ mod test {
}
std::ops::ControlFlow::Continue(())
});
assert_eq!(format!("{}", expr), "c_foo + c_bar + 50 !~ /c_str/")
assert_eq!(expr.to_string(), "c_foo + c_bar + 50 !~ /c_str/")
}
#[test]
@ -197,11 +195,11 @@ mod test {
walk_expr_mut::<()>(&mut expr, &mut |e| {
match e {
Expr::VarRef { name, .. } => *name = format!("c_{}", name).into(),
Expr::Literal(Literal::Unsigned(v)) => *v *= 10,
Expr::Literal(Literal::Integer(v)) => *v *= 10,
_ => {}
}
std::ops::ControlFlow::Continue(())
});
assert_eq!(format!("{}", expr), "c_foo + c_bar + 50")
assert_eq!(expr.to_string(), "c_foo + c_bar + 50")
}
}

View File

@ -130,11 +130,11 @@ mod test {
#[test]
fn test_identifier_display() {
// Identifier properly escapes specific characters and quotes output
let got = format!("{}", Identifier("quick\n\t\\\"'draw \u{1f47d}".into()));
let got = Identifier("quick\n\t\\\"'draw \u{1f47d}".into()).to_string();
assert_eq!(got, r#""quick\n \\\"'draw 👽""#);
// Identifier displays unquoted output
let got = format!("{}", Identifier("quick_draw".into()));
let got = Identifier("quick_draw".into()).to_string();
assert_eq!(got, "quick_draw");
}

View File

@ -36,8 +36,8 @@ impl<'a> ParseError<'a> for Error<&'a str> {
}
/// Applies a function returning a [`ParseResult`] over the result of the `parser`.
/// If the parser returns an error, the result will be mapped to a [`nom::Err::Failure`]
/// with the specified `message` for additional context.
/// If the parser returns an error, the result will be mapped to an unrecoverable
/// [`nom::Err::Failure`] with the specified `message` for additional context.
pub fn map_fail<'a, O1, O2, E: ParseError<'a>, E2, F, G>(
message: &'static str,
mut parser: F,
@ -56,6 +56,27 @@ where
}
}
/// Applies a function returning a [`ParseResult`] over the result of the `parser`.
/// If the parser returns an error, the result will be mapped to a recoverable
/// [`nom::Err::Error`] with the specified `message` for additional context.
pub fn map_error<'a, O1, O2, E: ParseError<'a>, E2, F, G>(
message: &'static str,
mut parser: F,
mut f: G,
) -> impl FnMut(&'a str) -> ParseResult<&'a str, O2, E>
where
F: Parser<&'a str, O1, E>,
G: FnMut(O1) -> Result<O2, E2>,
{
move |input| {
let (input, o1) = parser.parse(input)?;
match f(o1) {
Ok(o2) => Ok((input, o2)),
Err(_) => Err(nom::Err::Error(E::from_message(input, message))),
}
}
}
/// Transforms a [`nom::Err::Error`] to a [`nom::Err::Failure`] using `message` for additional
/// context.
pub fn expect<'a, E: ParseError<'a>, F, O>(

View File

@ -124,26 +124,26 @@ mod test {
fn test_parse_statements() {
// Parse a single statement, without a terminator
let got = parse_statements("SHOW MEASUREMENTS").unwrap();
assert_eq!(format!("{}", got.first().unwrap()), "SHOW MEASUREMENTS");
assert_eq!(got.first().unwrap().to_string(), "SHOW MEASUREMENTS");
// Parse a single statement, with a terminator
let got = parse_statements("SHOW MEASUREMENTS;").unwrap();
assert_eq!(format!("{}", got[0]), "SHOW MEASUREMENTS");
assert_eq!(got[0].to_string(), "SHOW MEASUREMENTS");
// Parse multiple statements with whitespace
let got = parse_statements("SHOW MEASUREMENTS;\nSHOW MEASUREMENTS LIMIT 1").unwrap();
assert_eq!(format!("{}", got[0]), "SHOW MEASUREMENTS");
assert_eq!(format!("{}", got[1]), "SHOW MEASUREMENTS LIMIT 1");
assert_eq!(got[0].to_string(), "SHOW MEASUREMENTS");
assert_eq!(got[1].to_string(), "SHOW MEASUREMENTS LIMIT 1");
// Parse multiple statements with a terminator in quotes, ensuring it is not interpreted as
// a terminator
let got =
parse_statements("SHOW MEASUREMENTS WITH MEASUREMENT = \";\";SHOW DATABASES").unwrap();
assert_eq!(
format!("{}", got[0]),
got[0].to_string(),
"SHOW MEASUREMENTS WITH MEASUREMENT = \";\""
);
assert_eq!(format!("{}", got[1]), "SHOW DATABASES");
assert_eq!(got[1].to_string(), "SHOW DATABASES");
// Parses a statement with a comment
let got = parse_statements(
@ -151,7 +151,7 @@ mod test {
)
.unwrap();
assert_eq!(
format!("{}", got[0]),
got[0].to_string(),
"SELECT idle FROM cpu WHERE host = 'host1'"
);
@ -161,24 +161,24 @@ mod test {
)
.unwrap();
assert_eq!(
format!("{}", got[0]),
got[0].to_string(),
"SELECT idle FROM cpu WHERE host = 'host1'"
);
assert_eq!(format!("{}", got[1]), "SHOW DATABASES");
assert_eq!(got[1].to_string(), "SHOW DATABASES");
// Parses statement with inline comment
let got = parse_statements(r#"SELECT idle FROM cpu WHERE/* time > now() AND */host = 'host1' --GROUP BY host fill(null)"#).unwrap();
assert_eq!(
format!("{}", got[0]),
got[0].to_string(),
"SELECT idle FROM cpu WHERE host = 'host1'"
);
// Returns error for invalid statement
let got = parse_statements("BAD SQL").unwrap_err();
assert_eq!(format!("{}", got), "invalid SQL statement at pos 0");
assert_eq!(got.to_string(), "invalid SQL statement at pos 0");
// Returns error for invalid statement after first
let got = parse_statements("SHOW MEASUREMENTS;BAD SQL").unwrap_err();
assert_eq!(format!("{}", got), "invalid SQL statement at pos 18");
assert_eq!(got.to_string(), "invalid SQL statement at pos 18");
}
}

View File

@ -1,7 +1,7 @@
//! Types and parsers for literals.
use crate::common::ws0;
use crate::internal::{map_fail, ParseResult};
use crate::internal::{map_error, map_fail, ParseResult};
use crate::keywords::keyword;
use crate::string::{regex, single_quoted_string, Regex};
use crate::{impl_tuple_clause, write_escaped};
@ -32,6 +32,9 @@ const NANOS_PER_WEEK: i64 = 7 * NANOS_PER_DAY;
/// Primitive InfluxQL literal values, such as strings and regular expressions.
#[derive(Clone, Debug, PartialEq)]
pub enum Literal {
/// Signed integer literal.
Integer(i64),
/// Unsigned integer literal.
Unsigned(u64),
@ -63,6 +66,12 @@ impl From<u64> for Literal {
}
}
impl From<i64> for Literal {
fn from(v: i64) -> Self {
Self::Integer(v)
}
}
impl From<f64> for Literal {
fn from(v: f64) -> Self {
Self::Float(v)
@ -90,6 +99,7 @@ impl From<Regex> for Literal {
impl Display for Literal {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Integer(v) => write!(f, "{}", v),
Self::Unsigned(v) => write!(f, "{}", v),
Self::Float(v) => write!(f, "{}", v),
Self::String(v) => {
@ -112,7 +122,29 @@ impl Display for Literal {
/// INTEGER ::= [0-9]+
/// ```
fn integer(i: &str) -> ParseResult<&str, i64> {
map_fail("unable to parse integer", digit1, &str::parse)(i)
map_error("unable to parse integer", digit1, &str::parse)(i)
}
/// Parse an InfluxQL integer to a [`Literal::Integer`] or [`Literal::Unsigned`]
/// if the string overflows. This behavior is consistent with [InfluxQL].
///
/// InfluxQL defines an integer as follows
///
/// ```text
/// INTEGER ::= [0-9]+
/// ```
///
/// [InfluxQL]: https://github.com/influxdata/influxql/blob/7e7d61973256ffeef4b99edd0a89f18a9e52fa2d/parser.go#L2669-L2675
fn integer_literal(i: &str) -> ParseResult<&str, Literal> {
map_fail(
"unable to parse integer due to overflow",
digit1,
|s: &str| {
s.parse::<i64>()
.map(Literal::Integer)
.or_else(|_| s.parse::<u64>().map(Literal::Unsigned))
},
)(i)
}
/// Parse an unsigned InfluxQL integer.
@ -224,11 +256,17 @@ static DIVISORS: [(i64, &str); 8] = [
impl Display for Duration {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self.0 {
let v = if self.0.is_negative() {
write!(f, "-")?;
-self.0
} else {
self.0
};
match v {
0 => f.write_str("0s")?,
mut i => {
// only return the divisors that are > self
for (div, unit) in DIVISORS.iter().filter(|(div, _)| self.0 > *div) {
for (div, unit) in DIVISORS.iter().filter(|(div, _)| v > *div) {
let units = i / div;
if units > 0 {
write!(f, "{}{}", units, unit)?;
@ -290,7 +328,7 @@ pub(crate) fn literal_no_regex(i: &str) -> ParseResult<&str, Literal> {
// NOTE: order is important, as floats should be tested before durations and integers.
map(float, Literal::Float),
map(duration, Literal::Duration),
map(unsigned_integer, Literal::Unsigned),
integer_literal,
map(single_quoted_string, Literal::String),
map(boolean, Literal::Boolean),
))(i)
@ -313,8 +351,14 @@ mod test {
#[test]
fn test_literal_no_regex() {
// Whole numbers are parsed first as a signed integer, and if that overflows,
// tries an unsigned integer, which is consistent with InfluxQL
let (_, got) = literal_no_regex("42").unwrap();
assert_matches!(got, Literal::Unsigned(42));
assert_matches!(got, Literal::Integer(42));
// > i64::MAX + 1 should be parsed as an unsigned integer
let (_, got) = literal_no_regex("9223372036854775808").unwrap();
assert_matches!(got, Literal::Unsigned(9223372036854775808));
let (_, got) = literal_no_regex("42.69").unwrap();
assert_matches!(got, Literal::Float(v) if v == 42.69);
@ -452,16 +496,18 @@ mod test {
#[test]
fn test_display_duration() {
let (_, d) = duration("3w2h15ms").unwrap();
let got = format!("{}", d);
assert_eq!(got, "3w2h15ms");
assert_eq!(d.to_string(), "3w2h15ms");
let (_, d) = duration("5s5s5s5s5s").unwrap();
let got = format!("{}", d);
assert_eq!(got, "25s");
assert_eq!(d.to_string(), "25s");
let d = Duration(0);
let got = format!("{}", d);
assert_eq!(got, "0s");
assert_eq!(d.to_string(), "0s");
// Negative duration
let (_, d) = duration("3w2h15ms").unwrap();
let d = Duration(-d.0);
assert_eq!(d.to_string(), "-3w2h15ms");
let d = Duration(
20 * NANOS_PER_WEEK
@ -473,8 +519,7 @@ mod test {
+ 8 * NANOS_PER_MICRO
+ 500,
);
let got = format!("{}", d);
assert_eq!(got, "20w6d13h11m10s9ms8us500ns");
assert_eq!(d.to_string(), "20w6d13h11m10s9ms8us500ns");
}
#[test]

View File

@ -93,15 +93,15 @@ mod test {
#[test]
fn test_bind_parameter_display() {
// BindParameter displays quoted output
let got = format!("{}", BindParameter("from foo".into()));
let got = BindParameter("from foo".into()).to_string();
assert_eq!(got, r#"$"from foo""#);
// BindParameter displays quoted and escaped output
let got = format!("{}", BindParameter("from\nfoo".into()));
let got = BindParameter("from\nfoo".into()).to_string();
assert_eq!(got, r#"$"from\nfoo""#);
// BindParameter displays unquoted output
let got = format!("{}", BindParameter("quick_draw".into()));
let got = BindParameter("quick_draw".into()).to_string();
assert_eq!(got, "$quick_draw");
}
}

View File

@ -703,20 +703,20 @@ mod test {
#[test]
fn test_select_statement() {
let (_, got) = select_statement("SELECT value FROM foo").unwrap();
assert_eq!(format!("{}", got), "SELECT value FROM foo");
assert_eq!(got.to_string(), "SELECT value FROM foo");
let (_, got) =
select_statement(r#"SELECT f1,/f2/, f3 AS "a field" FROM foo WHERE host =~ /c1/"#)
.unwrap();
assert_eq!(
format!("{}", got),
got.to_string(),
r#"SELECT f1, /f2/, f3 AS "a field" FROM foo WHERE host =~ /c1/"#
);
let (_, got) =
select_statement("SELECT sum(value) FROM foo GROUP BY time(5m), host").unwrap();
assert_eq!(
format!("{}", got),
got.to_string(),
r#"SELECT sum(value) FROM foo GROUP BY TIME(5m), host"#
);
@ -724,7 +724,7 @@ mod test {
let (_, got) =
select_statement("SELECT sum(value) FROM foo GROUP BY time(5m * 10), host").unwrap();
assert_eq!(
format!("{}", got),
got.to_string(),
r#"SELECT sum(value) FROM foo GROUP BY TIME(5m * 10), host"#
);
@ -737,37 +737,37 @@ mod test {
select_statement("SELECT sum(value) FROM foo GROUP BY time(5m), host FILL(previous)")
.unwrap();
assert_eq!(
format!("{}", got),
got.to_string(),
r#"SELECT sum(value) FROM foo GROUP BY TIME(5m), host FILL(PREVIOUS)"#
);
let (_, got) = select_statement("SELECT value FROM foo ORDER BY DESC").unwrap();
assert_eq!(
format!("{}", got),
got.to_string(),
r#"SELECT value FROM foo ORDER BY TIME DESC"#
);
let (_, got) = select_statement("SELECT value FROM foo ORDER BY TIME ASC").unwrap();
assert_eq!(
format!("{}", got),
got.to_string(),
r#"SELECT value FROM foo ORDER BY TIME ASC"#
);
let (_, got) = select_statement("SELECT value FROM foo LIMIT 5").unwrap();
assert_eq!(format!("{}", got), r#"SELECT value FROM foo LIMIT 5"#);
assert_eq!(got.to_string(), r#"SELECT value FROM foo LIMIT 5"#);
let (_, got) = select_statement("SELECT value FROM foo OFFSET 20").unwrap();
assert_eq!(format!("{}", got), r#"SELECT value FROM foo OFFSET 20"#);
assert_eq!(got.to_string(), r#"SELECT value FROM foo OFFSET 20"#);
let (_, got) = select_statement("SELECT value FROM foo SLIMIT 25").unwrap();
assert_eq!(format!("{}", got), r#"SELECT value FROM foo SLIMIT 25"#);
assert_eq!(got.to_string(), r#"SELECT value FROM foo SLIMIT 25"#);
let (_, got) = select_statement("SELECT value FROM foo SOFFSET 220").unwrap();
assert_eq!(format!("{}", got), r#"SELECT value FROM foo SOFFSET 220"#);
assert_eq!(got.to_string(), r#"SELECT value FROM foo SOFFSET 220"#);
let (_, got) = select_statement("SELECT value FROM foo tz('Australia/Hobart')").unwrap();
assert_eq!(
format!("{}", got),
got.to_string(),
r#"SELECT value FROM foo TZ('Australia/Hobart')"#
);

View File

@ -106,22 +106,22 @@ mod test {
// Validate each of the `SHOW` statements are accepted
let (_, got) = show_statement("SHOW DATABASES").unwrap();
assert_eq!(format!("{}", got), "SHOW DATABASES");
assert_eq!(got.to_string(), "SHOW DATABASES");
let (_, got) = show_statement("SHOW FIELD KEYS").unwrap();
assert_eq!(format!("{}", got), "SHOW FIELD KEYS");
assert_eq!(got.to_string(), "SHOW FIELD KEYS");
let (_, got) = show_statement("SHOW MEASUREMENTS").unwrap();
assert_eq!(format!("{}", got), "SHOW MEASUREMENTS");
assert_eq!(got.to_string(), "SHOW MEASUREMENTS");
let (_, got) = show_statement("SHOW RETENTION POLICIES ON \"foo\"").unwrap();
assert_eq!(format!("{}", got), "SHOW RETENTION POLICIES ON foo");
assert_eq!(got.to_string(), "SHOW RETENTION POLICIES ON foo");
let (_, got) = show_statement("SHOW TAG KEYS").unwrap();
assert_eq!(format!("{}", got), "SHOW TAG KEYS");
assert_eq!(got.to_string(), "SHOW TAG KEYS");
let (_, got) = show_statement("SHOW TAG VALUES WITH KEY = some_key").unwrap();
assert_eq!(format!("{}", got), "SHOW TAG VALUES WITH KEY = some_key");
assert_eq!(got.to_string(), "SHOW TAG VALUES WITH KEY = some_key");
// Fallible cases

View File

@ -100,36 +100,36 @@ mod test {
fn test_show_field_keys() {
// No optional clauses
let (_, got) = show_field_keys("FIELD KEYS").unwrap();
assert_eq!(format!("{}", got), "SHOW FIELD KEYS");
assert_eq!(got.to_string(), "SHOW FIELD KEYS");
let (_, got) = show_field_keys("FIELD KEYS ON db").unwrap();
assert_eq!(format!("{}", got), "SHOW FIELD KEYS ON db");
assert_eq!(got.to_string(), "SHOW FIELD KEYS ON db");
// measurement selection using name
let (_, got) = show_field_keys("FIELD KEYS FROM db..foo").unwrap();
assert_eq!(format!("{}", got), "SHOW FIELD KEYS FROM db..foo");
assert_eq!(got.to_string(), "SHOW FIELD KEYS FROM db..foo");
// measurement selection using regex
let (_, got) = show_field_keys("FIELD KEYS FROM /foo/").unwrap();
assert_eq!(format!("{}", got), "SHOW FIELD KEYS FROM /foo/");
assert_eq!(got.to_string(), "SHOW FIELD KEYS FROM /foo/");
// measurement selection using list
let (_, got) = show_field_keys("FIELD KEYS FROM /foo/ , bar, \"foo bar\"").unwrap();
assert_eq!(
format!("{}", got),
got.to_string(),
"SHOW FIELD KEYS FROM /foo/, bar, \"foo bar\""
);
let (_, got) = show_field_keys("FIELD KEYS LIMIT 1").unwrap();
assert_eq!(format!("{}", got), "SHOW FIELD KEYS LIMIT 1");
assert_eq!(got.to_string(), "SHOW FIELD KEYS LIMIT 1");
let (_, got) = show_field_keys("FIELD KEYS OFFSET 2").unwrap();
assert_eq!(format!("{}", got), "SHOW FIELD KEYS OFFSET 2");
assert_eq!(got.to_string(), "SHOW FIELD KEYS OFFSET 2");
// all optional clauses
let (_, got) = show_field_keys("FIELD KEYS ON db FROM /foo/ LIMIT 1 OFFSET 2").unwrap();
assert_eq!(
format!("{}", got),
got.to_string(),
"SHOW FIELD KEYS ON db FROM /foo/ LIMIT 1 OFFSET 2"
);

View File

@ -104,40 +104,40 @@ mod test {
fn test_show_tag_keys() {
// No optional clauses
let (_, got) = show_tag_keys("KEYS").unwrap();
assert_eq!(format!("{}", got), "SHOW TAG KEYS");
assert_eq!(got.to_string(), "SHOW TAG KEYS");
let (_, got) = show_tag_keys("KEYS ON db").unwrap();
assert_eq!(format!("{}", got), "SHOW TAG KEYS ON db");
assert_eq!(got.to_string(), "SHOW TAG KEYS ON db");
// measurement selection using name
let (_, got) = show_tag_keys("KEYS FROM db..foo").unwrap();
assert_eq!(format!("{}", got), "SHOW TAG KEYS FROM db..foo");
assert_eq!(got.to_string(), "SHOW TAG KEYS FROM db..foo");
// measurement selection using regex
let (_, got) = show_tag_keys("KEYS FROM /foo/").unwrap();
assert_eq!(format!("{}", got), "SHOW TAG KEYS FROM /foo/");
assert_eq!(got.to_string(), "SHOW TAG KEYS FROM /foo/");
// measurement selection using list
let (_, got) = show_tag_keys("KEYS FROM /foo/ , bar, \"foo bar\"").unwrap();
assert_eq!(
format!("{}", got),
got.to_string(),
"SHOW TAG KEYS FROM /foo/, bar, \"foo bar\""
);
let (_, got) = show_tag_keys("KEYS WHERE foo = 'bar'").unwrap();
assert_eq!(format!("{}", got), "SHOW TAG KEYS WHERE foo = 'bar'");
assert_eq!(got.to_string(), "SHOW TAG KEYS WHERE foo = 'bar'");
let (_, got) = show_tag_keys("KEYS LIMIT 1").unwrap();
assert_eq!(format!("{}", got), "SHOW TAG KEYS LIMIT 1");
assert_eq!(got.to_string(), "SHOW TAG KEYS LIMIT 1");
let (_, got) = show_tag_keys("KEYS OFFSET 2").unwrap();
assert_eq!(format!("{}", got), "SHOW TAG KEYS OFFSET 2");
assert_eq!(got.to_string(), "SHOW TAG KEYS OFFSET 2");
// all optional clauses
let (_, got) =
show_tag_keys("KEYS ON db FROM /foo/ WHERE foo = 'bar' LIMIT 1 OFFSET 2").unwrap();
assert_eq!(
format!("{}", got),
got.to_string(),
"SHOW TAG KEYS ON db FROM /foo/ WHERE foo = 'bar' LIMIT 1 OFFSET 2"
);

View File

@ -243,25 +243,22 @@ mod test {
fn test_show_tag_values() {
// No optional clauses
let (_, got) = show_tag_values("VALUES WITH KEY = some_key").unwrap();
assert_eq!(format!("{}", got), "SHOW TAG VALUES WITH KEY = some_key");
assert_eq!(got.to_string(), "SHOW TAG VALUES WITH KEY = some_key");
let (_, got) = show_tag_values("VALUES ON db WITH KEY = some_key").unwrap();
assert_eq!(
format!("{}", got),
"SHOW TAG VALUES ON db WITH KEY = some_key"
);
assert_eq!(got.to_string(), "SHOW TAG VALUES ON db WITH KEY = some_key");
// measurement selection using name
let (_, got) = show_tag_values("VALUES FROM db..foo WITH KEY = some_key").unwrap();
assert_eq!(
format!("{}", got),
got.to_string(),
"SHOW TAG VALUES FROM db..foo WITH KEY = some_key"
);
// measurement selection using regex
let (_, got) = show_tag_values("VALUES FROM /foo/ WITH KEY = some_key").unwrap();
assert_eq!(
format!("{}", got),
got.to_string(),
"SHOW TAG VALUES FROM /foo/ WITH KEY = some_key"
);
@ -269,25 +266,25 @@ mod test {
let (_, got) =
show_tag_values("VALUES FROM /foo/ , bar, \"foo bar\" WITH KEY = some_key").unwrap();
assert_eq!(
format!("{}", got),
got.to_string(),
"SHOW TAG VALUES FROM /foo/, bar, \"foo bar\" WITH KEY = some_key"
);
let (_, got) = show_tag_values("VALUES WITH KEY = some_key WHERE foo = 'bar'").unwrap();
assert_eq!(
format!("{}", got),
got.to_string(),
"SHOW TAG VALUES WITH KEY = some_key WHERE foo = 'bar'"
);
let (_, got) = show_tag_values("VALUES WITH KEY = some_key LIMIT 1").unwrap();
assert_eq!(
format!("{}", got),
got.to_string(),
"SHOW TAG VALUES WITH KEY = some_key LIMIT 1"
);
let (_, got) = show_tag_values("VALUES WITH KEY = some_key OFFSET 2").unwrap();
assert_eq!(
format!("{}", got),
got.to_string(),
"SHOW TAG VALUES WITH KEY = some_key OFFSET 2"
);
@ -297,12 +294,12 @@ mod test {
)
.unwrap();
assert_eq!(
format!("{}", got),
got.to_string(),
"SHOW TAG VALUES ON db FROM /foo/ WITH KEY = some_key WHERE foo = 'bar' LIMIT 1 OFFSET 2"
);
let (_, got) = show_tag_values("VALUES WITH KEY IN( foo )").unwrap();
assert_eq!(format!("{}", got), "SHOW TAG VALUES WITH KEY IN (foo)");
assert_eq!(got.to_string(), "SHOW TAG VALUES WITH KEY IN (foo)");
// Fallible cases are tested by the various combinator functions
}

View File

@ -2,8 +2,8 @@
source: influxdb_influxql_parser/src/visit.rs
expression: "visit_statement!(r#\"SELECT value FROM (SELECT usage FROM cpu WHERE host = \"node1\")\n WHERE region =~ /west/ AND value > 5\n GROUP BY TIME(5m), host\n FILL(previous)\n ORDER BY TIME DESC\n LIMIT 1 OFFSET 2\n SLIMIT 3 SOFFSET 4\n TZ('Australia/Hobart')\n \"#)"
---
- "pre_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) })"
- "pre_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) }"
- "pre_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) })"
- "pre_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) }"
- "pre_visit_select_field_list: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }"
- "pre_visit_select_field: Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }"
- "pre_visit_expr: VarRef { name: Identifier(\"value\"), data_type: None }"
@ -42,8 +42,8 @@ expression: "visit_statement!(r#\"SELECT value FROM (SELECT usage FROM cpu WHERE
- "post_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None }"
- "post_visit_select_measurement_selection: Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })"
- "post_visit_select_from_clause: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }"
- "pre_visit_where_clause: WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) } })"
- "pre_visit_conditional_expression: Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) } }"
- "pre_visit_where_clause: WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })"
- "pre_visit_conditional_expression: Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } }"
- "pre_visit_conditional_expression: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }"
- "pre_visit_conditional_expression: Expr(VarRef { name: Identifier(\"region\"), data_type: None })"
- "pre_visit_expr: VarRef { name: Identifier(\"region\"), data_type: None }"
@ -54,18 +54,18 @@ expression: "visit_statement!(r#\"SELECT value FROM (SELECT usage FROM cpu WHERE
- "post_visit_expr: Literal(Regex(Regex(\"west\")))"
- "post_visit_conditional_expression: Expr(Literal(Regex(Regex(\"west\"))))"
- "post_visit_conditional_expression: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }"
- "pre_visit_conditional_expression: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) }"
- "pre_visit_conditional_expression: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) }"
- "pre_visit_conditional_expression: Expr(VarRef { name: Identifier(\"value\"), data_type: None })"
- "pre_visit_expr: VarRef { name: Identifier(\"value\"), data_type: None }"
- "post_visit_expr: VarRef { name: Identifier(\"value\"), data_type: None }"
- "post_visit_conditional_expression: Expr(VarRef { name: Identifier(\"value\"), data_type: None })"
- "pre_visit_conditional_expression: Expr(Literal(Unsigned(5)))"
- "pre_visit_expr: Literal(Unsigned(5))"
- "post_visit_expr: Literal(Unsigned(5))"
- "post_visit_conditional_expression: Expr(Literal(Unsigned(5)))"
- "post_visit_conditional_expression: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) }"
- "post_visit_conditional_expression: Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) } }"
- "post_visit_where_clause: WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) } })"
- "pre_visit_conditional_expression: Expr(Literal(Integer(5)))"
- "pre_visit_expr: Literal(Integer(5))"
- "post_visit_expr: Literal(Integer(5))"
- "post_visit_conditional_expression: Expr(Literal(Integer(5)))"
- "post_visit_conditional_expression: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) }"
- "post_visit_conditional_expression: Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } }"
- "post_visit_where_clause: WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })"
- "pre_visit_group_by_clause: ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }"
- "pre_visit_select_dimension: Time { interval: Literal(Duration(Duration(300000000000))), offset: None }"
- "pre_visit_expr: Literal(Duration(Duration(300000000000)))"
@ -88,6 +88,6 @@ expression: "visit_statement!(r#\"SELECT value FROM (SELECT usage FROM cpu WHERE
- "post_visit_soffset_clause: SOffsetClause(4)"
- "pre_visit_timezone_clause: TimeZoneClause(Australia/Hobart)"
- "post_visit_timezone_clause: TimeZoneClause(Australia/Hobart)"
- "post_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) }"
- "post_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) })"
- "post_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) }"
- "post_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) })"

View File

@ -2,8 +2,8 @@
source: influxdb_influxql_parser/src/visit_mut.rs
expression: "visit_statement!(r#\"SELECT value FROM (SELECT usage FROM cpu WHERE host = \"node1\")\n WHERE region =~ /west/ AND value > 5\n GROUP BY TIME(5m), host\n FILL(previous)\n ORDER BY TIME DESC\n LIMIT 1 OFFSET 2\n SLIMIT 3 SOFFSET 4\n TZ('Australia/Hobart')\n \"#)"
---
- "pre_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) })"
- "pre_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) }"
- "pre_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) })"
- "pre_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) }"
- "pre_visit_select_field_list: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }"
- "pre_visit_select_field: Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }"
- "pre_visit_expr: VarRef { name: Identifier(\"value\"), data_type: None }"
@ -42,8 +42,8 @@ expression: "visit_statement!(r#\"SELECT value FROM (SELECT usage FROM cpu WHERE
- "post_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None }"
- "post_visit_select_measurement_selection: Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })"
- "post_visit_select_from_clause: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }"
- "pre_visit_where_clause: WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) } })"
- "pre_visit_conditional_expression: Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) } }"
- "pre_visit_where_clause: WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })"
- "pre_visit_conditional_expression: Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } }"
- "pre_visit_conditional_expression: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }"
- "pre_visit_conditional_expression: Expr(VarRef { name: Identifier(\"region\"), data_type: None })"
- "pre_visit_expr: VarRef { name: Identifier(\"region\"), data_type: None }"
@ -54,18 +54,18 @@ expression: "visit_statement!(r#\"SELECT value FROM (SELECT usage FROM cpu WHERE
- "post_visit_expr: Literal(Regex(Regex(\"west\")))"
- "post_visit_conditional_expression: Expr(Literal(Regex(Regex(\"west\"))))"
- "post_visit_conditional_expression: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }"
- "pre_visit_conditional_expression: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) }"
- "pre_visit_conditional_expression: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) }"
- "pre_visit_conditional_expression: Expr(VarRef { name: Identifier(\"value\"), data_type: None })"
- "pre_visit_expr: VarRef { name: Identifier(\"value\"), data_type: None }"
- "post_visit_expr: VarRef { name: Identifier(\"value\"), data_type: None }"
- "post_visit_conditional_expression: Expr(VarRef { name: Identifier(\"value\"), data_type: None })"
- "pre_visit_conditional_expression: Expr(Literal(Unsigned(5)))"
- "pre_visit_expr: Literal(Unsigned(5))"
- "post_visit_expr: Literal(Unsigned(5))"
- "post_visit_conditional_expression: Expr(Literal(Unsigned(5)))"
- "post_visit_conditional_expression: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) }"
- "post_visit_conditional_expression: Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) } }"
- "post_visit_where_clause: WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) } })"
- "pre_visit_conditional_expression: Expr(Literal(Integer(5)))"
- "pre_visit_expr: Literal(Integer(5))"
- "post_visit_expr: Literal(Integer(5))"
- "post_visit_conditional_expression: Expr(Literal(Integer(5)))"
- "post_visit_conditional_expression: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) }"
- "post_visit_conditional_expression: Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } }"
- "post_visit_where_clause: WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })"
- "pre_visit_group_by_clause: ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }"
- "pre_visit_select_dimension: Time { interval: Literal(Duration(Duration(300000000000))), offset: None }"
- "pre_visit_expr: Literal(Duration(Duration(300000000000)))"
@ -88,6 +88,6 @@ expression: "visit_statement!(r#\"SELECT value FROM (SELECT usage FROM cpu WHERE
- "post_visit_soffset_clause: SOffsetClause(4)"
- "pre_visit_timezone_clause: TimeZoneClause(Australia/Hobart)"
- "post_visit_timezone_clause: TimeZoneClause(Australia/Hobart)"
- "post_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) }"
- "post_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Unsigned(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) })"
- "post_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) }"
- "post_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) })"

View File

@ -29,7 +29,7 @@ macro_rules! assert_expect_error {
input: _,
message: got,
}) => {
assert_eq!(format!("{}", got), $MSG)
assert_eq!(got.to_string(), $MSG)
}
e => panic!("Expected Failure(Syntax(_, msg), got {:?}", e),
}

View File

@ -1172,7 +1172,6 @@ impl Visitable for Expr {
};
let visitor = match self {
Self::UnaryOp(_, expr) => expr.accept(visitor),
Self::Call { args, .. } => args.iter().try_fold(visitor, |v, e| e.accept(v)),
Self::Binary { lhs, op: _, rhs } => {
let visitor = lhs.accept(visitor)?;

View File

@ -1110,7 +1110,6 @@ impl VisitableMut for Expr {
};
match self {
Self::UnaryOp(_, expr) => expr.accept(visitor)?,
Self::Call { args, .. } => args.iter_mut().try_for_each(|e| e.accept(visitor))?,
Self::Binary { lhs, op: _, rhs } => {
lhs.accept(visitor)?;
@ -1818,7 +1817,6 @@ mod test {
.clone();
let mut vis = AddLimit;
statement.accept(&mut vis).unwrap();
let res = format!("{}", statement);
assert_eq!(res, "SELECT usage FROM cpu LIMIT 10");
assert_eq!(statement.to_string(), "SELECT usage FROM cpu LIMIT 10");
}
}

View File

@ -14,6 +14,7 @@ use executor::DedicatedExecutor;
use object_store::DynObjectStore;
use parquet_file::storage::StorageId;
use trace::span::{SpanExt, SpanRecorder};
mod cross_rt_stream;
use std::{collections::HashMap, sync::Arc};

View File

@ -2,6 +2,7 @@
//! DataFusion
use super::{
cross_rt_stream::CrossRtStream,
non_null_checker::NonNullCheckerNode,
seriesset::{series::Either, SeriesSet},
split::StreamSplitNode,
@ -40,7 +41,9 @@ use datafusion::{
coalesce_partitions::CoalescePartitionsExec,
displayable,
planner::{DefaultPhysicalPlanner, ExtensionPlanner},
EmptyRecordBatchStream, ExecutionPlan, PhysicalPlanner, SendableRecordBatchStream,
stream::RecordBatchStreamAdapter,
EmptyRecordBatchStream, ExecutionPlan, PhysicalPlanner, RecordBatchStream,
SendableRecordBatchStream,
},
prelude::*,
};
@ -409,12 +412,20 @@ impl IOxSessionContext {
let task_context = Arc::new(TaskContext::from(self.inner()));
self.run(async move {
let stream = physical_plan.execute(partition, task_context)?;
let stream = TracedStream::new(stream, span, physical_plan);
Ok(Box::pin(stream) as _)
})
.await
let stream = self
.run(async move {
let stream = physical_plan.execute(partition, task_context)?;
Ok(TracedStream::new(stream, span, physical_plan))
})
.await?;
// Wrap the resulting stream into `CrossRtStream`. This is required because polling the DataFusion result stream
// actually drives the (potentially CPU-bound) work. We need to make sure that this work stays within the
// dedicated executor because otherwise this may block the top-level tokio/tonic runtime which may lead to
// requests timetouts (either for new requests, metrics or even for HTTP2 pings on the active connection).
let schema = stream.schema();
let stream = CrossRtStream::new_with_arrow_error_stream(stream, self.exec.clone());
let stream = RecordBatchStreamAdapter::new(schema, stream);
Ok(Box::pin(stream))
}
/// Executes the SeriesSetPlans on the query executor, in
@ -442,24 +453,31 @@ impl IOxSessionContext {
let data = futures::stream::iter(plans)
.then(move |plan| {
let ctx = ctx.child_ctx("for plan");
Self::run_inner(exec.clone(), async move {
let SeriesSetPlan {
table_name,
plan,
tag_columns,
field_columns,
} = plan;
let exec = exec.clone();
let tag_columns = Arc::new(tag_columns);
async move {
let stream = Self::run_inner(exec.clone(), async move {
let SeriesSetPlan {
table_name,
plan,
tag_columns,
field_columns,
} = plan;
let physical_plan = ctx.create_physical_plan(&plan).await?;
let tag_columns = Arc::new(tag_columns);
let it = ctx.execute_stream(physical_plan).await?;
let physical_plan = ctx.create_physical_plan(&plan).await?;
SeriesSetConverter::default()
.convert(table_name, tag_columns, field_columns, it)
.await
})
let it = ctx.execute_stream(physical_plan).await?;
SeriesSetConverter::default()
.convert(table_name, tag_columns, field_columns, it)
.await
})
.await?;
Ok::<_, Error>(CrossRtStream::new_with_df_error_stream(stream, exec))
}
})
.try_flatten()
.try_filter_map(|series_set: SeriesSet| async move {
@ -564,11 +582,6 @@ impl IOxSessionContext {
}
}
/// Run the plan and return a record batch reader for reading the results
pub async fn run_logical_plan(&self, plan: LogicalPlan) -> Result<Vec<RecordBatch>> {
self.run_logical_plans(vec![plan]).await
}
/// plans and runs the plans in parallel and collects the results
/// run each plan in parallel and collect the results
async fn run_logical_plans(&self, plans: Vec<LogicalPlan>) -> Result<Vec<RecordBatch>> {
@ -603,7 +616,7 @@ impl IOxSessionContext {
Self::run_inner(self.exec.clone(), fut).await
}
pub async fn run_inner<Fut, T>(exec: DedicatedExecutor, fut: Fut) -> Result<T>
async fn run_inner<Fut, T>(exec: DedicatedExecutor, fut: Fut) -> Result<T>
where
Fut: std::future::Future<Output = Result<T>> + Send + 'static,
T: Send + 'static,

View File

@ -0,0 +1,373 @@
//! Tooling to pull [`Stream`]s from one tokio runtime into another.
//!
//! This is critical so that CPU heavy loads are not run on the same runtime as IO handling
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use arrow::error::ArrowError;
use datafusion::error::DataFusionError;
use executor::DedicatedExecutor;
use futures::{future::BoxFuture, ready, FutureExt, Stream, StreamExt};
use tokio::sync::mpsc::{channel, Sender};
use tokio_stream::wrappers::ReceiverStream;
/// [`Stream`] that is calculated by one tokio runtime but can safely be pulled from another w/o stalling (esp. when the
/// calculating runtime is CPU-blocked).
pub struct CrossRtStream<T> {
/// Future that drives the underlying stream.
///
/// This is actually wrapped into [`DedicatedExecutor::spawn`] so it can be safely polled by the receiving runtime.
driver: BoxFuture<'static, ()>,
/// Flags if the [driver](Self::driver) returned [`Poll::Ready`].
driver_ready: bool,
/// Receiving stream.
///
/// This one can be polled from the receiving runtime.
inner: ReceiverStream<T>,
/// Signals that [`inner`](Self::inner) finished.
///
/// Note that we must also drive the [driver](Self::driver) even when the stream finished to allow proper state clean-ups.
inner_done: bool,
}
impl<T> std::fmt::Debug for CrossRtStream<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CrossRtStream")
.field("driver", &"...")
.field("driver_ready", &self.driver_ready)
.field("inner", &"...")
.field("inner_done", &self.inner_done)
.finish()
}
}
impl<T> CrossRtStream<T> {
/// Create new stream by producing a future that sends its state to the given [`Sender`].
///
/// This is an internal method. `f` should always be wrapped into [`DedicatedExecutor::spawn`] (except for testing purposes).
fn new_with_tx<F, Fut>(f: F) -> Self
where
F: FnOnce(Sender<T>) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
let (tx, rx) = channel(1);
let driver = f(tx).boxed();
Self {
driver,
driver_ready: false,
inner: ReceiverStream::new(rx),
inner_done: false,
}
}
}
impl<X, E> CrossRtStream<Result<X, E>>
where
X: Send + 'static,
E: Send + 'static,
{
/// Create new stream based on an existing stream that transports [`Result`]s.
///
/// Also receives an executor that actually executes the underlying stream as well as a converter that convets
/// [`executor::Error`] to the error type of the stream (so we can send potential crashes/panics).
fn new_with_error_stream<S, C>(stream: S, exec: DedicatedExecutor, converter: C) -> Self
where
S: Stream<Item = Result<X, E>> + Send + 'static,
C: Fn(executor::Error) -> E + Send + 'static,
{
Self::new_with_tx(|tx| {
// future to be run in the other runtime
let tx_captured = tx.clone();
let fut = async move {
tokio::pin!(stream);
while let Some(res) = stream.next().await {
if tx_captured.send(res).await.is_err() {
// receiver gone
return;
}
}
};
// future for this runtime (likely the tokio/tonic/web driver)
async move {
if let Err(e) = exec.spawn(fut).await {
let e = converter(e);
// last message, so we don't care about the receiver side
tx.send(Err(e)).await.ok();
}
}
})
}
}
impl<X> CrossRtStream<Result<X, DataFusionError>>
where
X: Send + 'static,
{
/// Create new stream based on an existing stream that transports [`Result`]s w/ [`DataFusionError`]s.
///
/// Also receives an executor that actually executes the underlying stream.
pub fn new_with_df_error_stream<S>(stream: S, exec: DedicatedExecutor) -> Self
where
S: Stream<Item = Result<X, DataFusionError>> + Send + 'static,
{
Self::new_with_error_stream(stream, exec, |e| {
DataFusionError::Context(
"Join Error (panic)".to_string(),
Box::new(DataFusionError::External(e.into())),
)
})
}
}
impl<X> CrossRtStream<Result<X, ArrowError>>
where
X: Send + 'static,
{
/// Create new stream based on an existing stream that transports [`Result`]s w/ [`ArrowError`]s.
///
/// Also receives an executor that actually executes the underlying stream.
pub fn new_with_arrow_error_stream<S>(stream: S, exec: DedicatedExecutor) -> Self
where
S: Stream<Item = Result<X, ArrowError>> + Send + 'static,
{
Self::new_with_error_stream(stream, exec, |e| ArrowError::ExternalError(e.into()))
}
}
impl<T> Stream for CrossRtStream<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
if !this.driver_ready {
let res = this.driver.poll_unpin(cx);
if res.is_ready() {
this.driver_ready = true;
}
}
if this.inner_done {
if this.driver_ready {
Poll::Ready(None)
} else {
Poll::Pending
}
} else {
match ready!(this.inner.poll_next_unpin(cx)) {
None => {
this.inner_done = true;
if this.driver_ready {
Poll::Ready(None)
} else {
Poll::Pending
}
}
Some(x) => Poll::Ready(Some(x)),
}
}
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use super::*;
use tokio::runtime::{Handle, RuntimeFlavor};
#[tokio::test]
async fn test_async_block() {
let exec = DedicatedExecutor::new_testing();
let barrier1 = Arc::new(tokio::sync::Barrier::new(2));
let barrier1_captured = Arc::clone(&barrier1);
let barrier2 = Arc::new(tokio::sync::Barrier::new(2));
let barrier2_captured = Arc::clone(&barrier2);
let mut stream = CrossRtStream::<Result<u8, executor::Error>>::new_with_error_stream(
futures::stream::once(async move {
barrier1_captured.wait().await;
barrier2_captured.wait().await;
Ok(1)
}),
exec,
std::convert::identity,
);
let mut f = stream.next();
ensure_pending(&mut f).await;
barrier1.wait().await;
ensure_pending(&mut f).await;
barrier2.wait().await;
let res = f.await.expect("streamed data");
assert_eq!(res, Ok(1));
}
#[tokio::test]
async fn test_sync_block() {
// This would deadlock if the stream payload would run within the same tokio runtime. To prevent any cheating
// (e.g. via channels), we ensure that the current runtime only has a single thread:
assert_eq!(
RuntimeFlavor::CurrentThread,
Handle::current().runtime_flavor()
);
let exec = DedicatedExecutor::new_testing();
let barrier1 = Arc::new(std::sync::Barrier::new(2));
let barrier1_captured = Arc::clone(&barrier1);
let barrier2 = Arc::new(std::sync::Barrier::new(2));
let barrier2_captured = Arc::clone(&barrier2);
let mut stream = CrossRtStream::<Result<u8, executor::Error>>::new_with_error_stream(
futures::stream::once(async move {
barrier1_captured.wait();
barrier2_captured.wait();
Ok(1)
}),
exec,
std::convert::identity,
);
let mut f = stream.next();
ensure_pending(&mut f).await;
barrier1.wait();
ensure_pending(&mut f).await;
barrier2.wait();
let res = f.await.expect("streamed data");
assert_eq!(res, Ok(1));
}
#[tokio::test]
async fn test_panic() {
let exec = DedicatedExecutor::new_testing();
let mut stream = CrossRtStream::<Result<(), executor::Error>>::new_with_error_stream(
futures::stream::once(async { panic!("foo") }),
exec,
std::convert::identity,
);
let e = stream
.next()
.await
.expect("stream not finished")
.unwrap_err();
assert_eq!(e.to_string(), "foo");
let none = stream.next().await;
assert!(none.is_none());
}
#[tokio::test]
async fn test_cancel_future() {
let exec = DedicatedExecutor::new_testing();
let barrier1 = Arc::new(tokio::sync::Barrier::new(2));
let barrier1_captured = Arc::clone(&barrier1);
let barrier2 = Arc::new(tokio::sync::Barrier::new(2));
let barrier2_captured = Arc::clone(&barrier2);
let mut stream = CrossRtStream::<Result<u8, executor::Error>>::new_with_error_stream(
futures::stream::once(async move {
barrier1_captured.wait().await;
barrier2_captured.wait().await;
Ok(1)
}),
exec,
std::convert::identity,
);
let mut f = stream.next();
// fire up stream
ensure_pending(&mut f).await;
barrier1.wait().await;
// cancel
drop(f);
barrier2.wait().await;
let res = stream.next().await.expect("streamed data");
assert_eq!(res, Ok(1));
}
#[tokio::test]
async fn test_cancel_stream() {
let exec = DedicatedExecutor::new_testing();
let barrier = Arc::new(tokio::sync::Barrier::new(2));
let barrier_captured = Arc::clone(&barrier);
let mut stream = CrossRtStream::<Result<u8, executor::Error>>::new_with_error_stream(
futures::stream::once(async move {
barrier_captured.wait().await;
// block forever
futures::future::pending::<()>().await;
// keep barrier Arc alive
drop(barrier_captured);
unreachable!()
}),
exec,
std::convert::identity,
);
let mut f = stream.next();
// fire up stream
ensure_pending(&mut f).await;
barrier.wait().await;
assert_eq!(Arc::strong_count(&barrier), 2);
// cancel
drop(f);
drop(stream);
tokio::time::timeout(Duration::from_secs(5), async {
loop {
if Arc::strong_count(&barrier) == 1 {
return;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
}
#[tokio::test]
async fn test_inner_future_driven_to_completion_after_stream_ready() {
let barrier = Arc::new(tokio::sync::Barrier::new(2));
let barrier_captured = Arc::clone(&barrier);
let mut stream = CrossRtStream::<u8>::new_with_tx(|tx| async move {
tx.send(1).await.ok();
drop(tx);
barrier_captured.wait().await;
});
let handle = tokio::spawn(async move { barrier.wait().await });
assert_eq!(stream.next().await, Some(1));
handle.await.unwrap();
}
async fn ensure_pending<F>(f: &mut F)
where
F: Future + Send + Unpin,
{
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(100)) => {}
_ = f => {panic!("not pending")},
}
}
}

View File

@ -18,7 +18,7 @@ use datafusion::prelude::Column;
use datafusion::sql::planner::ContextProvider;
use datafusion::sql::TableReference;
use influxdb_influxql_parser::expression::{
BinaryOperator, ConditionalExpression, ConditionalOperator, UnaryOperator, VarRefDataType,
BinaryOperator, ConditionalExpression, ConditionalOperator, VarRefDataType,
};
use influxdb_influxql_parser::select::{SLimitClause, SOffsetClause};
use influxdb_influxql_parser::{
@ -279,6 +279,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
})),
IQLExpr::BindParameter(_) => Err(DataFusionError::NotImplemented("parameter".into())),
IQLExpr::Literal(val) => match val {
Literal::Integer(v) => Ok(lit(ScalarValue::Int64(Some(*v)))),
Literal::Unsigned(v) => Ok(lit(ScalarValue::UInt64(Some(*v)))),
Literal::Float(v) => Ok(lit(*v)),
Literal::String(v) => Ok(lit(v.clone())),
@ -298,10 +299,6 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
},
},
IQLExpr::Distinct(_) => Err(DataFusionError::NotImplemented("DISTINCT".into())),
IQLExpr::UnaryOp(op, e) => match (op, self.expr_to_df_expr(scope, e, schema)?) {
(UnaryOperator::Minus, e) => Ok(Expr::Negative(Box::new(e))),
(UnaryOperator::Plus, e) => Ok(e),
},
IQLExpr::Call { name, args } => self.call_to_df_expr(scope, name, args, schema),
IQLExpr::Binary { lhs, op, rhs } => {
self.arithmetic_expr_to_df_expr(scope, lhs, *op, rhs, schema)

View File

@ -2,7 +2,7 @@ use crate::plan::influxql::field::field_by_name;
use crate::plan::influxql::field_mapper::map_type;
use datafusion::common::{DataFusionError, Result};
use influxdb_influxql_parser::common::{MeasurementName, QualifiedMeasurementName};
use influxdb_influxql_parser::expression::{Expr, UnaryOperator, VarRefDataType};
use influxdb_influxql_parser::expression::{Expr, VarRefDataType};
use influxdb_influxql_parser::literal::Literal;
use influxdb_influxql_parser::select::{Dimension, FromMeasurementClause, MeasurementSelection};
use itertools::Itertools;
@ -39,15 +39,9 @@ impl<'a> TypeEvaluator<'a> {
Expr::Nested(expr) => self.eval_type(expr)?,
Expr::Literal(Literal::Float(_)) => Some(VarRefDataType::Float),
Expr::Literal(Literal::Unsigned(_)) => Some(VarRefDataType::Unsigned),
Expr::Literal(Literal::Integer(_)) => Some(VarRefDataType::Integer),
Expr::Literal(Literal::String(_)) => Some(VarRefDataType::String),
Expr::Literal(Literal::Boolean(_)) => Some(VarRefDataType::Boolean),
Expr::UnaryOp(op, expr) => match (op, self.eval_type(expr)?) {
(UnaryOperator::Minus, Some(VarRefDataType::Unsigned)) => {
Some(VarRefDataType::Integer)
}
(_, Some(ft)) => Some(ft),
(_, None) => None,
},
// Remaining patterns are not valid field types
Expr::BindParameter(_)
| Expr::Distinct(_)

View File

@ -21,7 +21,6 @@ pub(crate) fn field_name(f: &Field) -> String {
Expr::Call { name, .. } => return name.clone(),
Expr::Nested(nested) => nested,
Expr::Binary { .. } => return binary_expr_name(&f.expr),
Expr::UnaryOp(_, nested) => nested,
Expr::Distinct(_) => return "distinct".to_string(),
Expr::VarRef { name, .. } => return name.deref().into(),
Expr::Wildcard(_) | Expr::BindParameter(_) | Expr::Literal(_) => return "".to_string(),

View File

@ -690,7 +690,7 @@ mod test {
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT -bytes_free::integer AS bytes_free FROM disk"
"SELECT -1 * bytes_free::integer AS bytes_free FROM disk"
);
// Call expressions

View File

@ -2,5 +2,5 @@
source: iox_query/src/plan/influxql.rs
expression: "plan(\"SELECT foo, atan2(f64_field, 2) FROM data\")"
---
Projection: data.time, data.foo AS foo, atan2(data.f64_field, UInt64(2)) AS atan2 [time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, atan2:Float64;N]
Projection: data.time, data.foo AS foo, atan2(data.f64_field, Int64(2)) AS atan2 [time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, atan2:Float64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]

View File

@ -0,0 +1,82 @@
use std::sync::Arc;
use arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
use datafusion::{
error::DataFusionError,
logical_expr::{ScalarFunctionImplementation, ScalarUDF, Volatility},
prelude::create_udf,
};
use once_cell::sync::Lazy;
/// The name of the date_bin_gapfill UDF given to DataFusion.
pub const DATE_BIN_GAPFILL_UDF_NAME: &str = "date_bin_gapfill";
/// Implementation of date_bin_gapfill.
/// This function takes arguments identical to date_bin() but
/// will fill in gaps with nulls (or the last observed value
/// if used with locf).
/// This function will never have an actual implementation because it
/// is a placeholder for a custom plan node that does gap filling.
pub(crate) static DATE_BIN_GAPFILL: Lazy<Arc<ScalarUDF>> = Lazy::new(|| {
Arc::new(create_udf(
DATE_BIN_GAPFILL_UDF_NAME,
vec![
DataType::Interval(IntervalUnit::DayTime), // stride
DataType::Timestamp(TimeUnit::Nanosecond, None), // source
DataType::Timestamp(TimeUnit::Nanosecond, None), // origin
],
Arc::new(DataType::Timestamp(TimeUnit::Nanosecond, None)),
Volatility::Volatile,
unimplemented_scalar_impl(DATE_BIN_GAPFILL_UDF_NAME),
))
});
fn unimplemented_scalar_impl(name: &'static str) -> ScalarFunctionImplementation {
Arc::new(move |_| {
Err(DataFusionError::NotImplemented(format!(
"{} is not yet implemented",
name
)))
})
}
#[cfg(test)]
mod test {
use arrow::array::{ArrayRef, TimestampNanosecondArray};
use arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use datafusion::prelude::{col, lit_timestamp_nano, Expr};
use datafusion::scalar::ScalarValue;
use datafusion_util::context_with_table;
use std::sync::Arc;
fn date_bin_gapfill(stride: Expr, source: Expr, origin: Expr) -> Expr {
crate::registry()
.udf(super::DATE_BIN_GAPFILL_UDF_NAME)
.expect("should be registered")
.call(vec![stride, source, origin])
}
fn lit_interval_milliseconds(v: i64) -> Expr {
Expr::Literal(ScalarValue::IntervalDayTime(Some(v)))
}
#[tokio::test]
async fn date_bin_gapfill_errs() -> Result<()> {
let times = Arc::new(TimestampNanosecondArray::from(vec![Some(1000)]));
let rb = RecordBatch::try_from_iter(vec![("time", times as ArrayRef)])?;
let ctx = context_with_table(rb);
let df = ctx.table("t")?.select(vec![date_bin_gapfill(
lit_interval_milliseconds(360_000),
col("time"),
lit_timestamp_nano(0),
)])?;
let res = df.collect().await;
let expected = "date_bin_gapfill is not yet implemented";
assert!(res
.expect_err("should be an error")
.to_string()
.contains(expected));
Ok(())
}
}

View File

@ -30,6 +30,9 @@ pub mod selectors;
/// window_bounds expressions
mod window;
/// gap filling expressions
mod gapfill;
/// Function registry
mod registry;

View File

@ -7,7 +7,7 @@ use datafusion::{
};
use once_cell::sync::Lazy;
use crate::{regex, window};
use crate::{gapfill, regex, window};
static REGISTRY: Lazy<IOxFunctionRegistry> = Lazy::new(IOxFunctionRegistry::new);
@ -24,6 +24,7 @@ impl IOxFunctionRegistry {
impl FunctionRegistry for IOxFunctionRegistry {
fn udfs(&self) -> HashSet<String> {
[
gapfill::DATE_BIN_GAPFILL_UDF_NAME,
regex::REGEX_MATCH_UDF_NAME,
regex::REGEX_NOT_MATCH_UDF_NAME,
window::WINDOW_BOUNDS_UDF_NAME,
@ -35,6 +36,7 @@ impl FunctionRegistry for IOxFunctionRegistry {
fn udf(&self, name: &str) -> DataFusionResult<Arc<ScalarUDF>> {
match name {
gapfill::DATE_BIN_GAPFILL_UDF_NAME => Ok(gapfill::DATE_BIN_GAPFILL.clone()),
regex::REGEX_MATCH_UDF_NAME => Ok(regex::REGEX_MATCH_UDF.clone()),
regex::REGEX_NOT_MATCH_UDF_NAME => Ok(regex::REGEX_NOT_MATCH_UDF.clone()),
window::WINDOW_BOUNDS_UDF_NAME => Ok(window::WINDOW_BOUNDS_UDF.clone()),

View File

@ -14,7 +14,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
observability_deps = { path = "../observability_deps" }
workspace-hack = { path = "../workspace-hack"}
async-trait = { version = "0.1.61", optional = true }
tokio = { version = "1.24.1", optional = true, default_features = false, features = ["time"] }
tokio = { version = "1.24.2", optional = true, default_features = false, features = ["time"] }
[features]
default = []