feat: Teach InfluxQL how to plan an aggregate query (#7230)

* feat: Display failed query

Allows a user to immediately identify the failed query.

* feat: API improvements to InfluxQL parser

* feat: Extend `SchemaProvider` trait to query for UDFs

* fix: We don't want the parser to panic on overflows

* fix: ensure `map_type` maps the timestamp data type

* feat: API to map a InfluxQL duration expression to a DataFusion interval

* chore: Copied APIs from DataFusion SQL planner

These APIs are private but useful for InfluxQL planning.

* feat: Initial aggregate query support

* feat: Add an API to fetch a field by name

* chore: Fixes to handling NULLs in aggregates

* chore: Add ability to test expected failures for InfluxQL

* chore: appease rustfmt and clippy 😬

* chore: produce same error as InfluxQL

* chore: appease clippy

* chore: Improve docs

* chore: Simplify aggregate and raw planning

* feat: Add support for GROUP BY TIME(stride, offset)

* chore: Update docs

* chore: remove redundant `is_empty` check

Co-authored-by: Christopher M. Wolff <chris.wolff@influxdata.com>

* chore: PR feedback to clarify purpose of function

* chore: The series_sort can't be empty, as `time` is always added

This was originally intended as an optimisation when executing an
aggregate query that did not group by time or tags, as it will produce
N rows, where N is the number of measurements queried.

* chore: update comment for clarity

---------

Co-authored-by: Christopher M. Wolff <chris.wolff@influxdata.com>
pull/24376/head
Stuart Carnie 2023-03-23 12:13:15 +11:00 committed by GitHub
parent 77948e3341
commit 08ef689d21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1903 additions and 621 deletions

View File

@ -289,7 +289,8 @@ impl Display for Duration {
fn single_duration(i: &str) -> ParseResult<&str, i64> {
use DurationUnit::*;
map(
map_fail(
"overflow",
pair(
integer,
alt((
@ -304,15 +305,18 @@ fn single_duration(i: &str) -> ParseResult<&str, i64> {
value(Week, tag("w")), // weeks
)),
),
|(v, unit)| match unit {
Nanosecond => v,
Microsecond => v * NANOS_PER_MICRO,
Millisecond => v * NANOS_PER_MILLI,
Second => v * NANOS_PER_SEC,
Minute => v * NANOS_PER_MIN,
Hour => v * NANOS_PER_HOUR,
Day => v * NANOS_PER_DAY,
Week => v * NANOS_PER_WEEK,
|(v, unit)| {
(match unit {
Nanosecond => Some(v),
Microsecond => v.checked_mul(NANOS_PER_MICRO),
Millisecond => v.checked_mul(NANOS_PER_MILLI),
Second => v.checked_mul(NANOS_PER_SEC),
Minute => v.checked_mul(NANOS_PER_MIN),
Hour => v.checked_mul(NANOS_PER_HOUR),
Day => v.checked_mul(NANOS_PER_DAY),
Week => v.checked_mul(NANOS_PER_WEEK),
})
.ok_or("integer overflow")
},
)(i)
}
@ -407,6 +411,8 @@ mod test {
// Fallible cases
integer("hello").unwrap_err();
integer("9223372036854775808").expect_err("expected overflow");
}
#[test]
@ -487,6 +493,11 @@ mod test {
let (_, got) = single_duration("5w").unwrap();
assert_eq!(got, 5 * NANOS_PER_WEEK);
// Fallible
// Handle overflow
single_duration("16000w").expect_err("expected overflow");
}
#[test]

View File

@ -70,6 +70,15 @@ pub struct SelectStatement {
pub timezone: Option<TimeZoneClause>,
}
impl SelectStatement {
/// Return the `FILL` behaviour for the `SELECT` statement.
///
/// The default when no `FILL` clause present is `FILL(null)`.
pub fn fill(&self) -> FillClause {
self.fill.unwrap_or_default()
}
}
impl Display for SelectStatement {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "SELECT {} {}", self.fields, self.from)?;
@ -242,6 +251,24 @@ impl Display for GroupByClause {
}
}
impl GroupByClause {
/// Returns the time dimension for the `GROUP BY` clause.
pub fn time_dimension(&self) -> Option<&TimeDimension> {
self.contents.iter().find_map(|dim| match dim {
Dimension::Time(t) => Some(t),
_ => None,
})
}
/// Returns an iterator of all the tag dimensions for the `GROUP BY` clause.
pub fn tags(&self) -> impl Iterator<Item = &Identifier> + '_ {
self.contents.iter().filter_map(|dim| match dim {
Dimension::Tag(i) => Some(i),
_ => None,
})
}
}
/// Used to parse the interval argument of the TIME function
struct TimeCallIntervalArgument;
@ -290,16 +317,30 @@ impl ArithmeticParsers for TimeCallOffsetArgument {
}
}
/// Represents a `TIME` dimension in a `GROUP BY` clause.
#[derive(Clone, Debug, PartialEq)]
pub struct TimeDimension {
/// The first argument of the `TIME` call.
pub interval: Expr,
/// An optional second argument to specify the offset applied to the `TIME` call.
pub offset: Option<Expr>,
}
impl Display for TimeDimension {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "TIME({}", self.interval)?;
if let Some(offset) = &self.offset {
write!(f, ", {offset}")?;
}
write!(f, ")")
}
}
/// Represents a dimension of a `GROUP BY` clause.
#[derive(Clone, Debug, PartialEq)]
pub enum Dimension {
/// Represents a `TIME` call in a `GROUP BY` clause.
Time {
/// The first argument of the `TIME` call.
interval: Expr,
/// An optional second argument to specify the offset applied to the `TIME` call.
offset: Option<Expr>,
},
Time(TimeDimension),
/// Represents a literal tag reference in a `GROUP BY` clause.
Tag(Identifier),
@ -314,11 +355,7 @@ pub enum Dimension {
impl Display for Dimension {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::Time {
interval,
offset: Some(offset),
} => write!(f, "TIME({interval}, {offset})"),
Self::Time { interval, .. } => write!(f, "TIME({interval})"),
Self::Time(v) => Display::fmt(v, f),
Self::Tag(v) => Display::fmt(v, f),
Self::Regex(v) => Display::fmt(v, f),
Self::Wildcard => f.write_char('*'),
@ -366,7 +403,7 @@ fn time_call_expression(i: &str) -> ParseResult<&str, Dimension> {
expect("invalid TIME call, expected ')'", preceded(ws0, char(')'))),
),
),
|(interval, offset)| Dimension::Time { interval, offset },
|(interval, offset)| Dimension::Time(TimeDimension { interval, offset }),
)(i)
}
@ -390,9 +427,12 @@ fn group_by_clause(i: &str) -> ParseResult<&str, GroupByClause> {
}
/// Represents a `FILL` clause, and specifies all possible cases of the argument to the `FILL` clause.
#[derive(Debug, Clone, Copy, PartialEq)]
#[derive(Debug, Default, Clone, Copy, PartialEq)]
pub enum FillClause {
/// Empty aggregate windows will contain null values and is specified as `fill(null)`
///
/// This is the default behavior of a `SELECT` statement, when the `FILL` clause is omitted.
#[default]
Null,
/// Empty aggregate windows will be discarded and is specified as `fill(none)`.
@ -704,6 +744,8 @@ mod test {
fn test_select_statement() {
let (_, got) = select_statement("SELECT value FROM foo").unwrap();
assert_eq!(got.to_string(), "SELECT value FROM foo");
// Assert default behaviour when `FILL` is omitted
assert_eq!(got.fill(), FillClause::Null);
let (_, got) =
select_statement(r#"SELECT f1,/f2/, f3 AS "a field" FROM foo WHERE host =~ /c1/"#)
@ -740,6 +782,7 @@ mod test {
got.to_string(),
r#"SELECT sum(value) FROM foo GROUP BY TIME(5m), host FILL(PREVIOUS)"#
);
assert_eq!(got.fill(), FillClause::Previous);
let (_, got) = select_statement("SELECT value FROM foo ORDER BY DESC").unwrap();
assert_eq!(
@ -1141,6 +1184,20 @@ mod test {
);
}
#[test]
fn test_group_by_clause_tags_time_dimension() {
let (_, got) = group_by_clause("GROUP BY *, /foo/, TIME(5m), tag1, tag2").unwrap();
assert!(got.time_dimension().is_some());
assert_eq!(
got.tags().cloned().collect::<Vec<_>>(),
vec!["tag1".into(), "tag2".into()]
);
let (_, got) = group_by_clause("GROUP BY *, /foo/").unwrap();
assert!(got.time_dimension().is_none());
assert_eq!(got.tags().count(), 0);
}
#[test]
fn test_time_call_expression() {
let (got, _) = time_call_expression("TIME(5m)").unwrap();

View File

@ -2,8 +2,8 @@
source: influxdb_influxql_parser/src/visit.rs
expression: "visit_statement!(r#\"SELECT value FROM (SELECT usage FROM cpu WHERE host = \"node1\")\n WHERE region =~ /west/ AND value > 5\n GROUP BY TIME(5m), host\n FILL(previous)\n ORDER BY TIME DESC\n LIMIT 1 OFFSET 2\n SLIMIT 3 SOFFSET 4\n TZ('Australia/Hobart')\n \"#)"
---
- "pre_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) })"
- "pre_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) }"
- "pre_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time(TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None }), Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) })"
- "pre_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time(TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None }), Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) }"
- "pre_visit_select_field_list: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }"
- "pre_visit_select_field: Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }"
- "pre_visit_expr: VarRef { name: Identifier(\"value\"), data_type: None }"
@ -66,14 +66,16 @@ expression: "visit_statement!(r#\"SELECT value FROM (SELECT usage FROM cpu WHERE
- "post_visit_conditional_expression: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) }"
- "post_visit_conditional_expression: Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } }"
- "post_visit_where_clause: WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })"
- "pre_visit_group_by_clause: ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }"
- "pre_visit_select_dimension: Time { interval: Literal(Duration(Duration(300000000000))), offset: None }"
- "pre_visit_group_by_clause: ZeroOrMore { contents: [Time(TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None }), Tag(Identifier(\"host\"))] }"
- "pre_visit_select_dimension: Time(TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None })"
- "pre_visit_select_time_dimension: TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None }"
- "pre_visit_expr: Literal(Duration(Duration(300000000000)))"
- "post_visit_expr: Literal(Duration(Duration(300000000000)))"
- "post_visit_select_dimension: Time { interval: Literal(Duration(Duration(300000000000))), offset: None }"
- "post_visit_select_time_dimension: TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None }"
- "post_visit_select_dimension: Time(TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None })"
- "pre_visit_select_dimension: Tag(Identifier(\"host\"))"
- "post_visit_select_dimension: Tag(Identifier(\"host\"))"
- "post_visit_group_by_clause: ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }"
- "post_visit_group_by_clause: ZeroOrMore { contents: [Time(TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None }), Tag(Identifier(\"host\"))] }"
- "pre_visit_fill_clause: Previous"
- "post_visit_fill_clause: Previous"
- "pre_visit_order_by_clause: Descending"
@ -88,6 +90,6 @@ expression: "visit_statement!(r#\"SELECT value FROM (SELECT usage FROM cpu WHERE
- "post_visit_soffset_clause: SOffsetClause(4)"
- "pre_visit_timezone_clause: TimeZoneClause(Australia/Hobart)"
- "post_visit_timezone_clause: TimeZoneClause(Australia/Hobart)"
- "post_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) }"
- "post_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) })"
- "post_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time(TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None }), Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) }"
- "post_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time(TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None }), Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) })"

View File

@ -2,8 +2,8 @@
source: influxdb_influxql_parser/src/visit_mut.rs
expression: "visit_statement!(r#\"SELECT value FROM (SELECT usage FROM cpu WHERE host = \"node1\")\n WHERE region =~ /west/ AND value > 5\n GROUP BY TIME(5m), host\n FILL(previous)\n ORDER BY TIME DESC\n LIMIT 1 OFFSET 2\n SLIMIT 3 SOFFSET 4\n TZ('Australia/Hobart')\n \"#)"
---
- "pre_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) })"
- "pre_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) }"
- "pre_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time(TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None }), Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) })"
- "pre_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time(TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None }), Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) }"
- "pre_visit_select_field_list: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }"
- "pre_visit_select_field: Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }"
- "pre_visit_expr: VarRef { name: Identifier(\"value\"), data_type: None }"
@ -66,14 +66,16 @@ expression: "visit_statement!(r#\"SELECT value FROM (SELECT usage FROM cpu WHERE
- "post_visit_conditional_expression: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) }"
- "post_visit_conditional_expression: Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } }"
- "post_visit_where_clause: WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })"
- "pre_visit_group_by_clause: ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }"
- "pre_visit_select_dimension: Time { interval: Literal(Duration(Duration(300000000000))), offset: None }"
- "pre_visit_group_by_clause: ZeroOrMore { contents: [Time(TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None }), Tag(Identifier(\"host\"))] }"
- "pre_visit_select_dimension: Time(TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None })"
- "pre_visit_select_time_dimension: TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None }"
- "pre_visit_expr: Literal(Duration(Duration(300000000000)))"
- "post_visit_expr: Literal(Duration(Duration(300000000000)))"
- "post_visit_select_dimension: Time { interval: Literal(Duration(Duration(300000000000))), offset: None }"
- "post_visit_select_time_dimension: TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None }"
- "post_visit_select_dimension: Time(TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None })"
- "pre_visit_select_dimension: Tag(Identifier(\"host\"))"
- "post_visit_select_dimension: Tag(Identifier(\"host\"))"
- "post_visit_group_by_clause: ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }"
- "post_visit_group_by_clause: ZeroOrMore { contents: [Time(TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None }), Tag(Identifier(\"host\"))] }"
- "pre_visit_fill_clause: Previous"
- "post_visit_fill_clause: Previous"
- "pre_visit_order_by_clause: Descending"
@ -88,6 +90,6 @@ expression: "visit_statement!(r#\"SELECT value FROM (SELECT usage FROM cpu WHERE
- "post_visit_soffset_clause: SOffsetClause(4)"
- "pre_visit_timezone_clause: TimeZoneClause(Australia/Hobart)"
- "post_visit_timezone_clause: TimeZoneClause(Australia/Hobart)"
- "post_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) }"
- "post_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time { interval: Literal(Duration(Duration(300000000000))), offset: None }, Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) })"
- "post_visit_select_statement: SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time(TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None }), Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) }"
- "post_visit_statement: Select(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"value\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Subquery(SelectStatement { fields: ZeroOrMore { contents: [Field { expr: VarRef { name: Identifier(\"usage\"), data_type: None }, alias: None }] }, from: ZeroOrMore { contents: [Name(QualifiedMeasurementName { database: None, retention_policy: None, name: Name(Identifier(\"cpu\")) })] }, condition: Some(WhereClause(Binary { lhs: Expr(VarRef { name: Identifier(\"host\"), data_type: None }), op: Eq, rhs: Expr(VarRef { name: Identifier(\"node1\"), data_type: None }) })), group_by: None, fill: None, order_by: None, limit: None, offset: None, series_limit: None, series_offset: None, timezone: None })] }, condition: Some(WhereClause(Binary { lhs: Binary { lhs: Expr(VarRef { name: Identifier(\"region\"), data_type: None }), op: EqRegex, rhs: Expr(Literal(Regex(Regex(\"west\")))) }, op: And, rhs: Binary { lhs: Expr(VarRef { name: Identifier(\"value\"), data_type: None }), op: Gt, rhs: Expr(Literal(Integer(5))) } })), group_by: Some(ZeroOrMore { contents: [Time(TimeDimension { interval: Literal(Duration(Duration(300000000000))), offset: None }), Tag(Identifier(\"host\"))] }), fill: Some(Previous), order_by: Some(Descending), limit: Some(LimitClause(1)), offset: Some(OffsetClause(2)), series_limit: Some(SLimitClause(3)), series_offset: Some(SOffsetClause(4)), timezone: Some(TimeZoneClause(Australia/Hobart)) })"

View File

@ -36,7 +36,8 @@ use crate::expression::arithmetic::Expr;
use crate::expression::conditional::ConditionalExpression;
use crate::select::{
Dimension, Field, FieldList, FillClause, FromMeasurementClause, GroupByClause,
MeasurementSelection, SLimitClause, SOffsetClause, SelectStatement, TimeZoneClause,
MeasurementSelection, SLimitClause, SOffsetClause, SelectStatement, TimeDimension,
TimeZoneClause,
};
use crate::show::{OnClause, ShowDatabasesStatement};
use crate::show_field_keys::ShowFieldKeysStatement;
@ -367,6 +368,19 @@ pub trait Visitor: Sized {
Ok(self)
}
/// Invoked before `TIME` dimension clause is visited.
fn pre_visit_select_time_dimension(
self,
_n: &TimeDimension,
) -> Result<Recursion<Self>, Self::Error> {
Ok(Continue(self))
}
/// Invoked after `TIME` dimension clause is visited.
fn post_visit_select_time_dimension(self, _n: &TimeDimension) -> Result<Self, Self::Error> {
Ok(self)
}
/// Invoked before any children of the `WHERE` clause are visited.
fn pre_visit_where_clause(self, _n: &WhereClause) -> Result<Recursion<Self>, Self::Error> {
Ok(Continue(self))
@ -1108,14 +1122,7 @@ impl Visitable for Dimension {
};
let visitor = match self {
Self::Time { interval, offset } => {
let visitor = interval.accept(visitor)?;
if let Some(offset) = offset {
offset.accept(visitor)
} else {
Ok(visitor)
}
}
Self::Time(v) => v.accept(visitor),
Self::Tag(_) | Self::Regex(_) | Self::Wildcard => Ok(visitor),
}?;
@ -1123,6 +1130,24 @@ impl Visitable for Dimension {
}
}
impl Visitable for TimeDimension {
fn accept<V: Visitor>(&self, visitor: V) -> Result<V, V::Error> {
let visitor = match visitor.pre_visit_select_time_dimension(self)? {
Continue(visitor) => visitor,
Stop(visitor) => return Ok(visitor),
};
let visitor = self.interval.accept(visitor)?;
let visitor = if let Some(offset) = &self.offset {
offset.accept(visitor)?
} else {
visitor
};
visitor.post_visit_select_time_dimension(self)
}
}
impl Visitable for WithKeyClause {
fn accept<V: Visitor>(&self, visitor: V) -> Result<V, V::Error> {
let visitor = match visitor.pre_visit_with_key_clause(self)? {
@ -1218,7 +1243,8 @@ mod test {
use crate::expression::conditional::ConditionalExpression;
use crate::select::{
Dimension, Field, FieldList, FillClause, FromMeasurementClause, GroupByClause,
MeasurementSelection, SLimitClause, SOffsetClause, SelectStatement, TimeZoneClause,
MeasurementSelection, SLimitClause, SOffsetClause, SelectStatement, TimeDimension,
TimeZoneClause,
};
use crate::show::{OnClause, ShowDatabasesStatement};
use crate::show_field_keys::ShowFieldKeysStatement;
@ -1506,6 +1532,17 @@ mod test {
Ok(self.push_post("select_dimension", n))
}
fn pre_visit_select_time_dimension(
self,
n: &TimeDimension,
) -> Result<Recursion<Self>, Self::Error> {
Ok(Continue(self.push_pre("select_time_dimension", n)))
}
fn post_visit_select_time_dimension(self, n: &TimeDimension) -> Result<Self, Self::Error> {
Ok(self.push_post("select_time_dimension", n))
}
fn pre_visit_where_clause(self, n: &WhereClause) -> Result<Recursion<Self>, Self::Error> {
Ok(Continue(self.push_pre("where_clause", n)))
}

View File

@ -36,7 +36,8 @@ use crate::expression::arithmetic::Expr;
use crate::expression::conditional::ConditionalExpression;
use crate::select::{
Dimension, Field, FieldList, FillClause, FromMeasurementClause, GroupByClause,
MeasurementSelection, SLimitClause, SOffsetClause, SelectStatement, TimeZoneClause,
MeasurementSelection, SLimitClause, SOffsetClause, SelectStatement, TimeDimension,
TimeZoneClause,
};
use crate::show::{OnClause, ShowDatabasesStatement};
use crate::show_field_keys::ShowFieldKeysStatement;
@ -380,6 +381,22 @@ pub trait VisitorMut: Sized {
Ok(())
}
/// Invoked before `TIME` dimension clause is visited.
fn pre_visit_select_time_dimension(
&mut self,
_n: &mut TimeDimension,
) -> Result<Recursion, Self::Error> {
Ok(Continue)
}
/// Invoked after `TIME` dimension clause is visited.
fn post_visit_select_time_dimension(
&mut self,
_n: &mut TimeDimension,
) -> Result<(), Self::Error> {
Ok(())
}
/// Invoked before any children of the `WHERE` clause are visited.
fn pre_visit_where_clause(&mut self, _n: &mut WhereClause) -> Result<Recursion, Self::Error> {
Ok(Continue)
@ -1052,12 +1069,7 @@ impl VisitableMut for Dimension {
};
match self {
Self::Time { interval, offset } => {
interval.accept(visitor)?;
if let Some(offset) = offset {
offset.accept(visitor)?;
}
}
Self::Time(v) => v.accept(visitor)?,
Self::Tag(_) | Self::Regex(_) | Self::Wildcard => {}
};
@ -1065,6 +1077,21 @@ impl VisitableMut for Dimension {
}
}
impl VisitableMut for TimeDimension {
fn accept<V: VisitorMut>(&mut self, visitor: &mut V) -> Result<(), V::Error> {
if let Stop = visitor.pre_visit_select_time_dimension(self)? {
return Ok(());
};
self.interval.accept(visitor)?;
if let Some(offset) = &mut self.offset {
offset.accept(visitor)?;
}
visitor.post_visit_select_time_dimension(self)
}
}
impl VisitableMut for WithKeyClause {
fn accept<V: VisitorMut>(&mut self, visitor: &mut V) -> Result<(), V::Error> {
if let Stop = visitor.pre_visit_with_key_clause(self)? {
@ -1156,7 +1183,8 @@ mod test {
use crate::parse_statements;
use crate::select::{
Dimension, Field, FieldList, FillClause, FromMeasurementClause, GroupByClause,
MeasurementSelection, SLimitClause, SOffsetClause, SelectStatement, TimeZoneClause,
MeasurementSelection, SLimitClause, SOffsetClause, SelectStatement, TimeDimension,
TimeZoneClause,
};
use crate::show::{OnClause, ShowDatabasesStatement};
use crate::show_field_keys::ShowFieldKeysStatement;
@ -1498,6 +1526,22 @@ mod test {
Ok(())
}
fn pre_visit_select_time_dimension(
&mut self,
n: &mut TimeDimension,
) -> Result<Recursion, Self::Error> {
self.push_pre("select_time_dimension", n);
Ok(Continue)
}
fn post_visit_select_time_dimension(
&mut self,
n: &mut TimeDimension,
) -> Result<(), Self::Error> {
self.push_post("select_time_dimension", n);
Ok(())
}
fn pre_visit_where_clause(
&mut self,
n: &mut WhereClause,

View File

@ -271,9 +271,58 @@ SELECT cpu, usage_idle FROM cpu;
SELECT usage_idle FROM cpu GROUP BY cpu;
SELECT usage_idle, cpu FROM cpu GROUP BY cpu;
-- group by a non-existent tag
SELECT usage_idle FROM cpu GROUP BY cpu, non_existent;
-- group by and project a non-existent tag
SELECT usage_idle, non_existent FROM cpu GROUP BY cpu, non_existent;
-- multiple measurements and tags in the group by
SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY cpu;
SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY cpu, non_existent;
SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY cpu, device;
SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY device, cpu;
SELECT usage_idle, bytes_free, device, cpu FROM cpu, disk GROUP BY device, cpu;
--
-- Aggregate queries
--
SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0 GROUP BY tag0;
SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0 GROUP BY tag0, non_existent;
SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0 GROUP BY non_existent;
SELECT COUNT(f64), COUNT(f64) + COUNT(f64), COUNT(f64) * 3 FROM m0;
-- non-existent columns in an aggregate should evaluate to NULL
SELECT COUNT(f64) as the_count, SUM(non_existent) as foo FROM m0;
-- non-existent columns in an aggregate expression should evaluate to NULL
SELECT COUNT(f64) as the_count, SUM(f64) + SUM(non_existent) as foo FROM m0;
SELECT COUNT(f64), SUM(f64) FROM m0 GROUP BY TIME(30s) FILL(none);
-- supports offset parameter
SELECT COUNT(f64), SUM(f64) FROM m0 GROUP BY TIME(30s, 1s) FILL(none);
SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk;
SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk GROUP BY TIME(1s) FILL(none);
SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk GROUP BY cpu;
SELECT COUNT(usage_idle) as count_usage_idle, COUNT(bytes_free) as count_bytes_free FROM cpu, disk WHERE cpu = 'cpu0' OR device = 'disk1s1' GROUP BY cpu;
-- measurements without any matching fields are omitted from the result set
SELECT SUM(usage_idle) FROM cpu, disk WHERE cpu = 'cpu0' GROUP BY cpu;
SELECT SUM(usage_idle) FROM cpu, disk GROUP BY cpu;
-- Fallible cases
-- Mixing aggregate and non-aggregate columns
SELECT COUNT(usage_idle) + usage_idle FROM cpu;
SELECT COUNT(usage_idle), usage_idle FROM cpu;
-- Unimplemented cases
-- TODO(sgc): No gap filling
-- Default FILL(null) when FILL is omitted
SELECT COUNT(usage_idle) FROM cpu GROUP BY TIME(30s);
SELECT COUNT(usage_idle) FROM cpu GROUP BY TIME(30s) FILL(previous);
-- LIMIT and OFFSET aren't supported with aggregates and groups
SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu LIMIT 1;
SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu OFFSET 1;

View File

@ -455,6 +455,28 @@
| cpu | 2022-10-31T02:00:00Z | 1.98 | cpu1 |
| cpu | 2022-10-31T02:00:10Z | 1.99 | cpu1 |
+------------------+----------------------+------------+-----------+
-- InfluxQL: SELECT usage_idle FROM cpu GROUP BY cpu, non_existent;
+------------------+----------------------+-----------+--------------+------------+
| iox::measurement | time | cpu | non_existent | usage_idle |
+------------------+----------------------+-----------+--------------+------------+
| cpu | 2022-10-31T02:00:00Z | cpu-total | | 2.98 |
| cpu | 2022-10-31T02:00:10Z | cpu-total | | 2.99 |
| cpu | 2022-10-31T02:00:00Z | cpu0 | | 0.98 |
| cpu | 2022-10-31T02:00:10Z | cpu0 | | 0.99 |
| cpu | 2022-10-31T02:00:00Z | cpu1 | | 1.98 |
| cpu | 2022-10-31T02:00:10Z | cpu1 | | 1.99 |
+------------------+----------------------+-----------+--------------+------------+
-- InfluxQL: SELECT usage_idle, non_existent FROM cpu GROUP BY cpu, non_existent;
+------------------+----------------------+-----------+------------+--------------+
| iox::measurement | time | cpu | usage_idle | non_existent |
+------------------+----------------------+-----------+------------+--------------+
| cpu | 2022-10-31T02:00:00Z | cpu-total | 2.98 | |
| cpu | 2022-10-31T02:00:10Z | cpu-total | 2.99 | |
| cpu | 2022-10-31T02:00:00Z | cpu0 | 0.98 | |
| cpu | 2022-10-31T02:00:10Z | cpu0 | 0.99 | |
| cpu | 2022-10-31T02:00:00Z | cpu1 | 1.98 | |
| cpu | 2022-10-31T02:00:10Z | cpu1 | 1.99 | |
+------------------+----------------------+-----------+------------+--------------+
-- InfluxQL: SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY cpu;
+------------------+----------------------+-----------+------------+------------+
| iox::measurement | time | cpu | usage_idle | bytes_free |
@ -472,6 +494,23 @@
| disk | 2022-10-31T02:00:10Z | | | 2239.0 |
| disk | 2022-10-31T02:00:10Z | | | 3239.0 |
+------------------+----------------------+-----------+------------+------------+
-- InfluxQL: SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY cpu, non_existent;
+------------------+----------------------+-----------+--------------+------------+------------+
| iox::measurement | time | cpu | non_existent | usage_idle | bytes_free |
+------------------+----------------------+-----------+--------------+------------+------------+
| cpu | 2022-10-31T02:00:00Z | cpu-total | | 2.98 | |
| cpu | 2022-10-31T02:00:10Z | cpu-total | | 2.99 | |
| cpu | 2022-10-31T02:00:00Z | cpu0 | | 0.98 | |
| cpu | 2022-10-31T02:00:10Z | cpu0 | | 0.99 | |
| cpu | 2022-10-31T02:00:00Z | cpu1 | | 1.98 | |
| cpu | 2022-10-31T02:00:10Z | cpu1 | | 1.99 | |
| disk | 2022-10-31T02:00:00Z | | | | 1234.0 |
| disk | 2022-10-31T02:00:00Z | | | | 2234.0 |
| disk | 2022-10-31T02:00:00Z | | | | 3234.0 |
| disk | 2022-10-31T02:00:10Z | | | | 1239.0 |
| disk | 2022-10-31T02:00:10Z | | | | 2239.0 |
| disk | 2022-10-31T02:00:10Z | | | | 3239.0 |
+------------------+----------------------+-----------+--------------+------------+------------+
-- InfluxQL: SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY cpu, device;
+------------------+----------------------+-----------+---------+------------+------------+
| iox::measurement | time | cpu | device | usage_idle | bytes_free |
@ -522,4 +561,116 @@
| disk | 2022-10-31T02:00:10Z | | 2239.0 | disk1s2 | |
| disk | 2022-10-31T02:00:00Z | | 3234.0 | disk1s5 | |
| disk | 2022-10-31T02:00:10Z | | 3239.0 | disk1s5 | |
+------------------+----------------------+------------+------------+---------+-----------+
+------------------+----------------------+------------+------------+---------+-----------+
-- InfluxQL: SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0 GROUP BY tag0;
+------------------+----------------------+-------+-------+------+-------------------+
| iox::measurement | time | tag0 | count | sum | stddev |
+------------------+----------------------+-------+-------+------+-------------------+
| m0 | 1970-01-01T00:00:00Z | val00 | 5 | 80.6 | 5.085961069453836 |
| m0 | 1970-01-01T00:00:00Z | val01 | 1 | 11.3 | |
| m0 | 1970-01-01T00:00:00Z | val02 | 1 | 10.4 | |
+------------------+----------------------+-------+-------+------+-------------------+
-- InfluxQL: SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0 GROUP BY tag0, non_existent;
+------------------+----------------------+--------------+-------+-------+------+-------------------+
| iox::measurement | time | non_existent | tag0 | count | sum | stddev |
+------------------+----------------------+--------------+-------+-------+------+-------------------+
| m0 | 1970-01-01T00:00:00Z | | val00 | 5 | 80.6 | 5.085961069453836 |
| m0 | 1970-01-01T00:00:00Z | | val01 | 1 | 11.3 | |
| m0 | 1970-01-01T00:00:00Z | | val02 | 1 | 10.4 | |
+------------------+----------------------+--------------+-------+-------+------+-------------------+
-- InfluxQL: SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0 GROUP BY non_existent;
+------------------+----------------------+--------------+-------+--------------------+--------------------+
| iox::measurement | time | non_existent | count | sum | stddev |
+------------------+----------------------+--------------+-------+--------------------+--------------------+
| m0 | 1970-01-01T00:00:00Z | | 7 | 102.30000000000001 | 4.8912945019454614 |
+------------------+----------------------+--------------+-------+--------------------+--------------------+
-- InfluxQL: SELECT COUNT(f64), COUNT(f64) + COUNT(f64), COUNT(f64) * 3 FROM m0;
+------------------+----------------------+-------+---------------------+-----------+
| iox::measurement | time | count | count_f64_count_f64 | count_f64 |
+------------------+----------------------+-------+---------------------+-----------+
| m0 | 1970-01-01T00:00:00Z | 7 | 14 | 21 |
+------------------+----------------------+-------+---------------------+-----------+
-- InfluxQL: SELECT COUNT(f64) as the_count, SUM(non_existent) as foo FROM m0;
+------------------+----------------------+-----------+-----+
| iox::measurement | time | the_count | foo |
+------------------+----------------------+-----------+-----+
| m0 | 1970-01-01T00:00:00Z | 7 | |
+------------------+----------------------+-----------+-----+
-- InfluxQL: SELECT COUNT(f64) as the_count, SUM(f64) + SUM(non_existent) as foo FROM m0;
+------------------+----------------------+-----------+-----+
| iox::measurement | time | the_count | foo |
+------------------+----------------------+-----------+-----+
| m0 | 1970-01-01T00:00:00Z | 7 | |
+------------------+----------------------+-----------+-----+
-- InfluxQL: SELECT COUNT(f64), SUM(f64) FROM m0 GROUP BY TIME(30s) FILL(none);
+------------------+----------------------+-------+------+
| iox::measurement | time | count | sum |
+------------------+----------------------+-------+------+
| m0 | 2022-10-31T02:00:00Z | 6 | 83.1 |
| m0 | 2022-10-31T02:00:30Z | 1 | 19.2 |
+------------------+----------------------+-------+------+
-- InfluxQL: SELECT COUNT(f64), SUM(f64) FROM m0 GROUP BY TIME(30s, 1s) FILL(none);
+------------------+----------------------+-------+--------------------+
| iox::measurement | time | count | sum |
+------------------+----------------------+-------+--------------------+
| m0 | 2022-10-31T01:59:31Z | 3 | 31.799999999999997 |
| m0 | 2022-10-31T02:00:01Z | 4 | 70.5 |
+------------------+----------------------+-------+--------------------+
-- InfluxQL: SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk;
+------------------+----------------------+-------+---------+
| iox::measurement | time | count | count_1 |
+------------------+----------------------+-------+---------+
| cpu | 1970-01-01T00:00:00Z | 6 | |
| disk | 1970-01-01T00:00:00Z | | 6 |
+------------------+----------------------+-------+---------+
-- InfluxQL: SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk GROUP BY TIME(1s) FILL(none);
+------------------+----------------------+-------+---------+
| iox::measurement | time | count | count_1 |
+------------------+----------------------+-------+---------+
| cpu | 2022-10-31T02:00:00Z | 3 | |
| cpu | 2022-10-31T02:00:10Z | 3 | |
| disk | 2022-10-31T02:00:00Z | | 3 |
| disk | 2022-10-31T02:00:10Z | | 3 |
+------------------+----------------------+-------+---------+
-- InfluxQL: SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk GROUP BY cpu;
+------------------+----------------------+-----------+-------+---------+
| iox::measurement | time | cpu | count | count_1 |
+------------------+----------------------+-----------+-------+---------+
| cpu | 1970-01-01T00:00:00Z | cpu-total | 2 | |
| cpu | 1970-01-01T00:00:00Z | cpu0 | 2 | |
| cpu | 1970-01-01T00:00:00Z | cpu1 | 2 | |
| disk | 1970-01-01T00:00:00Z | | | 6 |
+------------------+----------------------+-----------+-------+---------+
-- InfluxQL: SELECT COUNT(usage_idle) as count_usage_idle, COUNT(bytes_free) as count_bytes_free FROM cpu, disk WHERE cpu = 'cpu0' OR device = 'disk1s1' GROUP BY cpu;
+------------------+----------------------+------+------------------+------------------+
| iox::measurement | time | cpu | count_usage_idle | count_bytes_free |
+------------------+----------------------+------+------------------+------------------+
| cpu | 1970-01-01T00:00:00Z | cpu0 | 2 | |
| disk | 1970-01-01T00:00:00Z | | | 2 |
+------------------+----------------------+------+------------------+------------------+
-- InfluxQL: SELECT SUM(usage_idle) FROM cpu, disk WHERE cpu = 'cpu0' GROUP BY cpu;
+------------------+----------------------+------+------+
| iox::measurement | time | cpu | sum |
+------------------+----------------------+------+------+
| cpu | 1970-01-01T00:00:00Z | cpu0 | 1.97 |
+------------------+----------------------+------+------+
-- InfluxQL: SELECT SUM(usage_idle) FROM cpu, disk GROUP BY cpu;
+------------------+----------------------+-----------+--------------------+
| iox::measurement | time | cpu | sum |
+------------------+----------------------+-----------+--------------------+
| cpu | 1970-01-01T00:00:00Z | cpu-total | 5.970000000000001 |
| cpu | 1970-01-01T00:00:00Z | cpu0 | 1.97 |
| cpu | 1970-01-01T00:00:00Z | cpu1 | 3.9699999999999998 |
+------------------+----------------------+-----------+--------------------+
-- InfluxQL: SELECT COUNT(usage_idle) + usage_idle FROM cpu;
Error while planning query: Error during planning: mixing aggregate and non-aggregate columns is not supported
-- InfluxQL: SELECT COUNT(usage_idle), usage_idle FROM cpu;
Error while planning query: Error during planning: mixing aggregate and non-aggregate columns is not supported
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY TIME(30s);
Error while planning query: This feature is not implemented: FILL(NULL)
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY TIME(30s) FILL(previous);
Error while planning query: This feature is not implemented: FILL(PREVIOUS)
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu LIMIT 1;
Error while planning query: This feature is not implemented: GROUP BY combined with LIMIT or OFFSET clause
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu OFFSET 1;
Error while planning query: This feature is not implemented: GROUP BY combined with LIMIT or OFFSET clause

View File

@ -9,8 +9,8 @@ use std::sync::Arc;
use crate::plan::{parse_regex, InfluxQLToLogicalPlan, SchemaProvider};
use datafusion::common::Statistics;
use datafusion::datasource::provider_as_source;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{LogicalPlan, TableSource};
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::logical_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::{Partitioning, SendableRecordBatchStream};
use datafusion::{
@ -25,11 +25,12 @@ use iox_query::exec::IOxSessionContext;
use observability_deps::tracing::debug;
use schema::Schema;
struct ContextSchemaProvider {
struct ContextSchemaProvider<'a> {
state: &'a SessionState,
tables: HashMap<String, (Arc<dyn TableSource>, Schema)>,
}
impl SchemaProvider for ContextSchemaProvider {
impl<'a> SchemaProvider for ContextSchemaProvider<'a> {
fn get_table_provider(&self, name: &str) -> Result<Arc<dyn TableSource>> {
self.tables
.get(name)
@ -37,6 +38,14 @@ impl SchemaProvider for ContextSchemaProvider {
.ok_or_else(|| DataFusionError::Plan(format!("measurement does not exist: {name}")))
}
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
self.state.scalar_functions().get(name).cloned()
}
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
self.state.aggregate_functions().get(name).cloned()
}
fn table_names(&self) -> Vec<&'_ str> {
self.tables.keys().map(|k| k.as_str()).collect::<Vec<_>>()
}
@ -171,6 +180,7 @@ impl InfluxQLQueryPlanner {
let query_tables = find_all_measurements(&statement, &names)?;
let mut sp = ContextSchemaProvider {
state: &ctx.inner().state(),
tables: HashMap::with_capacity(query_tables.len()),
};

View File

@ -36,12 +36,10 @@ pub(crate) fn map_type(
field: &str,
) -> Result<Option<VarRefDataType>> {
match s.table_schema(measurement_name) {
Some(iox) => Ok(match iox.find_index_of(field) {
Some(i) => match iox.field(i).0 {
InfluxColumnType::Field(ft) => Some(field_type_to_var_ref_data_type(ft)),
InfluxColumnType::Tag => Some(VarRefDataType::Tag),
InfluxColumnType::Timestamp => None,
},
Some(iox) => Ok(match iox.field_by_name(field) {
Some((InfluxColumnType::Field(ft), _)) => Some(field_type_to_var_ref_data_type(ft)),
Some((InfluxColumnType::Tag, _)) => Some(VarRefDataType::Tag),
Some((InfluxColumnType::Timestamp, _)) => Some(VarRefDataType::Timestamp),
None => None,
}),
None => Ok(None),
@ -87,6 +85,10 @@ mod test {
map_type(&namespace, "cpu", "host").unwrap(),
Some(VarRefDataType::Tag)
);
assert_matches!(
map_type(&namespace, "cpu", "time").unwrap(),
Some(VarRefDataType::Timestamp)
);
// Returns None for nonexistent field
assert!(map_type(&namespace, "cpu", "nonexistent")
.unwrap()

View File

@ -8,6 +8,7 @@ mod rewriter;
mod test_utils;
mod timestamp;
mod util;
mod util_copy;
mod var_ref;
pub use planner::InfluxQLToLogicalPlan;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,141 @@
use arrow::datatypes::DataType;
use datafusion::common::{DFSchemaRef, DataFusionError, Result};
use datafusion::logical_expr::utils::find_column_exprs;
use datafusion::logical_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion_util::AsExpr;
use generated_types::influxdata::iox::querier::v1::influx_ql_metadata::TagKeyColumn;
use influxdb_influxql_parser::common::OrderByClause;
use influxdb_influxql_parser::expression::{Expr as IQLExpr, VarRefDataType};
use influxdb_influxql_parser::select::{Field, SelectStatement};
use schema::INFLUXQL_MEASUREMENT_COLUMN_NAME;
use std::collections::HashMap;
use std::ops::Deref;
/// Determines that all [`Expr::Column`] references in `exprs` refer to a
/// column in `columns`.
pub(crate) fn check_exprs_satisfy_columns(columns: &[Expr], exprs: &[Expr]) -> Result<()> {
if !columns.iter().all(|c| matches!(c, Expr::Column(_))) {
return Err(DataFusionError::Internal(
"expected Expr::Column".to_owned(),
));
}
let column_exprs = find_column_exprs(exprs);
if column_exprs.iter().any(|expr| !columns.contains(expr)) {
return Err(DataFusionError::Plan(
"mixing aggregate and non-aggregate columns is not supported".to_owned(),
));
}
Ok(())
}
pub(super) fn make_tag_key_column_meta(
fields: &[Field],
tag_set: &[&str],
is_projected: &[bool],
) -> Vec<TagKeyColumn> {
/// There is always a [INFLUXQL_MEASUREMENT_COLUMN_NAME] and `time` column projected in the LogicalPlan,
/// therefore the start index is 2 for determining the offsets of the
/// tag key columns in the column projection list.
const START_INDEX: usize = 1;
// Create a map of tag key columns to their respective index in the projection
let index_map = fields
.iter()
.enumerate()
.filter_map(|(index, f)| match &f.expr {
IQLExpr::VarRef {
name,
data_type: Some(VarRefDataType::Tag) | None,
} => Some((name.deref().as_str(), index + START_INDEX)),
_ => None,
})
.collect::<HashMap<_, _>>();
// tag_set was previously sorted, so tag_key_columns will be in the correct order
tag_set
.iter()
.zip(is_projected)
.map(|(tag_key, is_projected)| TagKeyColumn {
tag_key: (*tag_key).to_owned(),
column_index: *index_map.get(*tag_key).unwrap() as _,
is_projected: *is_projected,
})
.collect()
}
/// Create a plan that sorts the input plan.
///
/// The ordering of the results is as follows:
///
/// iox::measurement, [group by tag 0, .., group by tag n], time, [projection tag 0, .., projection tag n]
///
/// ## NOTE
///
/// Sort expressions referring to tag keys are always specified in lexicographically ascending order.
pub(super) fn plan_with_sort(
plan: LogicalPlan,
select: &SelectStatement,
group_by_tag_set: &[&str],
projection_tag_set: &[&str],
) -> Result<LogicalPlan> {
// If there are multiple measurements, we need to sort by the measurement column
// NOTE: Ideally DataFusion would maintain the order of the UNION ALL, which would eliminate
// the need to sort by measurement.
// See: https://github.com/influxdata/influxdb_iox/issues/7062
let mut series_sort = if matches!(plan, LogicalPlan::Union(_)) {
vec![Expr::sort(
INFLUXQL_MEASUREMENT_COLUMN_NAME.as_expr(),
true,
false,
)]
} else {
vec![]
};
/// Map the fields to DataFusion [`Expr::Sort`] expressions, excluding those columns that
/// are [`DataType::Null`]'s, as sorting these column types is not supported and unnecessary.
fn map_to_expr<'a>(
schema: &'a DFSchemaRef,
fields: &'a [&str],
) -> impl Iterator<Item = Expr> + 'a {
fields
.iter()
.filter(|f| {
if let Ok(df) = schema.field_with_unqualified_name(f) {
*df.data_type() != DataType::Null
} else {
false
}
})
.map(|f| Expr::sort(f.as_expr(), true, false))
}
let schema = plan.schema();
if !group_by_tag_set.is_empty() {
// Adding `LIMIT` or `OFFSET` with a `GROUP BY tag, ...` clause is not supported
//
// See: https://github.com/influxdata/influxdb_iox/issues/6920
if select.offset.is_some() || select.limit.is_some() {
return Err(DataFusionError::NotImplemented(
"GROUP BY combined with LIMIT or OFFSET clause".to_owned(),
));
}
series_sort.extend(map_to_expr(schema, group_by_tag_set));
};
series_sort.push(Expr::sort(
"time".as_expr(),
match select.order_by {
// Default behaviour is to sort by time in ascending order if there is no ORDER BY
None | Some(OrderByClause::Ascending) => true,
Some(OrderByClause::Descending) => false,
},
false,
));
series_sort.extend(map_to_expr(schema, projection_tag_set));
LogicalPlanBuilder::from(plan).sort(series_sort)?.build()
}

View File

@ -1,5 +1,6 @@
use crate::plan::timestamp::parse_timestamp;
use crate::plan::util::binary_operator_to_df_operator;
use arrow::temporal_conversions::MILLISECONDS_IN_DAY;
use datafusion::common::{DataFusionError, Result, ScalarValue};
use datafusion::logical_expr::{binary_expr, lit, now, BinaryExpr, Expr as DFExpr, Operator};
use influxdb_influxql_parser::expression::BinaryOperator;
@ -70,6 +71,48 @@ pub(in crate::plan) fn time_range_to_df_expr(expr: &Expr, tz: Option<chrono_tz::
})
}
/// Simplifies `expr` to an InfluxQL duration and returns a DataFusion interval.
///
/// Returns an error if `expr` is not a duration expression.
///
/// ## NOTE
///
/// The returned interval is limited to a precision of milliseconds,
/// due to [issue #7204][]
///
/// [issue #7204]: https://github.com/influxdata/influxdb_iox/issues/7204
pub(super) fn expr_to_df_interval_dt(expr: &Expr) -> ExprResult {
let v = duration_expr_to_nanoseconds(expr)?;
if v % 1_000_000 != 0 {
Err(DataFusionError::NotImplemented("interval limited to a precision of milliseconds. See https://github.com/influxdata/influxdb_iox/issues/7204".to_owned()))
} else {
let v = v / 1_000_000;
let days = v / MILLISECONDS_IN_DAY;
// keep the sign on `days` and remove it from `millis`
let millis = (v - days * MILLISECONDS_IN_DAY).abs();
// It is not possible for an InfluxQL duration to overflow an IntervalDayTime.
// An InfluxQL duration encodes a number of nanoseconds into a 64-bit signed integer,
// which is a maximum of 15,250.2845 days. An IntervalDayTime can encode days
// as a signed 32-bit number.
Ok(lit(ScalarValue::new_interval_dt(
days as i32,
millis as i32,
)))
}
}
/// Reduces an InfluxQL duration `expr` to a nanosecond interval.
pub(super) fn duration_expr_to_nanoseconds(expr: &Expr) -> Result<i64> {
let df_expr = reduce_expr(expr, None)?;
match df_expr {
DFExpr::Literal(ScalarValue::IntervalMonthDayNano(Some(v))) => Ok(v as i64),
DFExpr::Literal(ScalarValue::Float64(Some(v))) => Ok(v as i64),
DFExpr::Literal(ScalarValue::Int64(Some(v))) => Ok(v),
_ => Err(DataFusionError::Plan("invalid duration expression".into())),
}
}
fn map_expr_err(expr: &Expr) -> impl Fn(DataFusionError) -> DataFusionError + '_ {
move |err| {
DataFusionError::Plan(format!(
@ -393,6 +436,7 @@ fn parse_timestamp_df_expr(s: &str, tz: Option<chrono_tz::Tz>) -> ExprResult {
#[cfg(test)]
mod test {
use super::*;
use assert_matches::assert_matches;
use influxdb_influxql_parser::expression::ConditionalExpression;
use test_helpers::assert_error;
@ -545,4 +589,33 @@ mod test {
"TimestampNanosecond(1081505100123456789, None)" // 2004-04-09T10:05:00.123456789Z
);
}
#[test]
fn test_expr_to_df_interval_dt() {
fn parse(s: &str) -> ExprResult {
let expr = s
.parse::<ConditionalExpression>()
.unwrap()
.expr()
.unwrap()
.clone();
expr_to_df_interval_dt(&expr)
}
use ScalarValue::IntervalDayTime;
assert_matches!(parse("10s").unwrap(), DFExpr::Literal(IntervalDayTime(v)) if IntervalDayTime(v) == ScalarValue::new_interval_dt(0, 10_000));
assert_matches!(parse("10s + 1d").unwrap(), DFExpr::Literal(IntervalDayTime(v)) if IntervalDayTime(v) == ScalarValue::new_interval_dt(1, 10_000));
assert_matches!(parse("5d10ms").unwrap(), DFExpr::Literal(IntervalDayTime(v)) if IntervalDayTime(v) == ScalarValue::new_interval_dt(5, 10));
assert_matches!(parse("-2d10ms").unwrap(), DFExpr::Literal(IntervalDayTime(v)) if IntervalDayTime(v) == ScalarValue::new_interval_dt(-2, 10));
// Fallible
use DataFusionError::NotImplemented;
// Don't support a precision greater than milliseconds.
//
// See: https://github.com/influxdata/influxdb_iox/issues/7204
assert_error!(parse("-2d10ns"), NotImplemented(ref s) if s == "interval limited to a precision of milliseconds. See https://github.com/influxdata/influxdb_iox/issues/7204");
}
}

View File

@ -180,9 +180,11 @@ fn has_wildcards(stmt: &SelectStatement) -> (bool, bool) {
/// Rewrite the projection list and GROUP BY of the specified `SELECT` statement.
///
/// Wildcards and regular expressions in the `SELECT` projection list and `GROUP BY` are expanded.
/// Any fields with no type specifier are rewritten with the appropriate type, if they exist in the
/// underlying schema.
/// The following transformations are performed:
///
/// * Wildcards and regular expressions in the `SELECT` projection list and `GROUP BY` are expanded.
/// * Any fields with no type specifier are rewritten with the appropriate type, if they exist in the
/// underlying schema.
///
/// Derived from [Go implementation](https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L1185).
fn rewrite_field_list(s: &dyn SchemaProvider, stmt: &mut SelectStatement) -> Result<()> {

View File

@ -5,7 +5,7 @@ use crate::plan::SchemaProvider;
use datafusion::common::{DataFusionError, Result as DataFusionResult};
use datafusion::datasource::empty::EmptyTable;
use datafusion::datasource::provider_as_source;
use datafusion::logical_expr::TableSource;
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, TableSource};
use influxdb_influxql_parser::parse_statements;
use influxdb_influxql_parser::select::{Field, SelectStatement};
use influxdb_influxql_parser::statement::Statement;
@ -159,6 +159,14 @@ impl SchemaProvider for MockSchemaProvider {
.ok_or_else(|| DataFusionError::Plan(format!("measurement does not exist: {name}")))
}
fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
None
}
fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
None
}
fn table_names(&self) -> Vec<&'_ str> {
self.tables
.keys()

View File

@ -0,0 +1,337 @@
// NOTE: This code is copied from DataFusion, as it is not public,
// so all warnings are disabled.
#![allow(warnings)]
#![allow(clippy::all)]
//! A collection of utility functions copied from DataFusion.
//!
//! If these APIs are stabilised and made public, they can be removed from IOx.
//!
//! NOTE
use datafusion::common::{DataFusionError, Result};
use datafusion::logical_expr::{
expr::{
AggregateFunction, Between, BinaryExpr, Case, Cast, Expr, GetIndexedField, GroupingSet,
Like, Sort, TryCast, WindowFunction,
},
utils::expr_as_column_expr,
LogicalPlan,
};
/// Rebuilds an `Expr` as a projection on top of a collection of `Expr`'s.
///
/// For example, the expression `a + b < 1` would require, as input, the 2
/// individual columns, `a` and `b`. But, if the base expressions already
/// contain the `a + b` result, then that may be used in lieu of the `a` and
/// `b` columns.
///
/// This is useful in the context of a query like:
///
/// SELECT a + b < 1 ... GROUP BY a + b
///
/// where post-aggregation, `a + b` need not be a projection against the
/// individual columns `a` and `b`, but rather it is a projection against the
/// `a + b` found in the GROUP BY.
///
/// Source: <https://github.com/apache/arrow-datafusion/blob/e6d71068474f3b2ef9ad5e9af85f56f0d0560a1b/datafusion/sql/src/utils.rs#L63>
pub(crate) fn rebase_expr(expr: &Expr, base_exprs: &[Expr], plan: &LogicalPlan) -> Result<Expr> {
clone_with_replacement(expr, &|nested_expr| {
if base_exprs.contains(nested_expr) {
Ok(Some(expr_as_column_expr(nested_expr, plan)?))
} else {
Ok(None)
}
})
}
/// Returns a cloned `Expr`, but any of the `Expr`'s in the tree may be
/// replaced/customized by the replacement function.
///
/// The replacement function is called repeatedly with `Expr`, starting with
/// the argument `expr`, then descending depth-first through its
/// descendants. The function chooses to replace or keep (clone) each `Expr`.
///
/// The function's return type is `Result<Option<Expr>>>`, where:
///
/// * `Ok(Some(replacement_expr))`: A replacement `Expr` is provided; it is
/// swapped in at the particular node in the tree. Any nested `Expr` are
/// not subject to cloning/replacement.
/// * `Ok(None)`: A replacement `Expr` is not provided. The `Expr` is
/// recreated, with all of its nested `Expr`'s subject to
/// cloning/replacement.
/// * `Err(err)`: Any error returned by the function is returned as-is by
/// `clone_with_replacement()`.
///
/// Source: <https://github.com/apache/arrow-datafusion/blob/26e1b20ea/datafusion/sql/src/utils.rs#L153>
fn clone_with_replacement<F>(expr: &Expr, replacement_fn: &F) -> Result<Expr>
where
F: Fn(&Expr) -> Result<Option<Expr>>,
{
let replacement_opt = replacement_fn(expr)?;
match replacement_opt {
// If we were provided a replacement, use the replacement. Do not
// descend further.
Some(replacement) => Ok(replacement),
// No replacement was provided, clone the node and recursively call
// clone_with_replacement() on any nested expressions.
None => {
match expr {
Expr::AggregateFunction(AggregateFunction {
fun,
args,
distinct,
filter,
}) => Ok(Expr::AggregateFunction(AggregateFunction::new(
fun.clone(),
args.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
*distinct,
filter.clone(),
))),
Expr::WindowFunction(WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
}) => Ok(Expr::WindowFunction(WindowFunction::new(
fun.clone(),
args.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<_>>>()?,
partition_by
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<_>>>()?,
order_by
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<_>>>()?,
window_frame.clone(),
))),
Expr::AggregateUDF { fun, args, filter } => Ok(Expr::AggregateUDF {
fun: fun.clone(),
args: args
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
filter: filter.clone(),
}),
Expr::Alias(nested_expr, alias_name) => Ok(Expr::Alias(
Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
alias_name.clone(),
)),
Expr::Between(Between {
expr,
negated,
low,
high,
}) => Ok(Expr::Between(Between::new(
Box::new(clone_with_replacement(expr, replacement_fn)?),
*negated,
Box::new(clone_with_replacement(low, replacement_fn)?),
Box::new(clone_with_replacement(high, replacement_fn)?),
))),
Expr::InList {
expr: nested_expr,
list,
negated,
} => Ok(Expr::InList {
expr: Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
list: list
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
negated: *negated,
}),
Expr::BinaryExpr(BinaryExpr { left, right, op }) => {
Ok(Expr::BinaryExpr(BinaryExpr::new(
Box::new(clone_with_replacement(left, replacement_fn)?),
*op,
Box::new(clone_with_replacement(right, replacement_fn)?),
)))
}
Expr::Like(Like {
negated,
expr,
pattern,
escape_char,
}) => Ok(Expr::Like(Like::new(
*negated,
Box::new(clone_with_replacement(expr, replacement_fn)?),
Box::new(clone_with_replacement(pattern, replacement_fn)?),
*escape_char,
))),
Expr::ILike(Like {
negated,
expr,
pattern,
escape_char,
}) => Ok(Expr::ILike(Like::new(
*negated,
Box::new(clone_with_replacement(expr, replacement_fn)?),
Box::new(clone_with_replacement(pattern, replacement_fn)?),
*escape_char,
))),
Expr::SimilarTo(Like {
negated,
expr,
pattern,
escape_char,
}) => Ok(Expr::SimilarTo(Like::new(
*negated,
Box::new(clone_with_replacement(expr, replacement_fn)?),
Box::new(clone_with_replacement(pattern, replacement_fn)?),
*escape_char,
))),
Expr::Case(case) => Ok(Expr::Case(Case::new(
match &case.expr {
Some(case_expr) => {
Some(Box::new(clone_with_replacement(case_expr, replacement_fn)?))
}
None => None,
},
case.when_then_expr
.iter()
.map(|(a, b)| {
Ok((
Box::new(clone_with_replacement(a, replacement_fn)?),
Box::new(clone_with_replacement(b, replacement_fn)?),
))
})
.collect::<Result<Vec<(_, _)>>>()?,
match &case.else_expr {
Some(else_expr) => {
Some(Box::new(clone_with_replacement(else_expr, replacement_fn)?))
}
None => None,
},
))),
Expr::ScalarFunction { fun, args } => Ok(Expr::ScalarFunction {
fun: fun.clone(),
args: args
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
}),
Expr::ScalarUDF { fun, args } => Ok(Expr::ScalarUDF {
fun: fun.clone(),
args: args
.iter()
.map(|arg| clone_with_replacement(arg, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
}),
Expr::Negative(nested_expr) => Ok(Expr::Negative(Box::new(
clone_with_replacement(nested_expr, replacement_fn)?,
))),
Expr::Not(nested_expr) => Ok(Expr::Not(Box::new(clone_with_replacement(
nested_expr,
replacement_fn,
)?))),
Expr::IsNotNull(nested_expr) => Ok(Expr::IsNotNull(Box::new(
clone_with_replacement(nested_expr, replacement_fn)?,
))),
Expr::IsNull(nested_expr) => Ok(Expr::IsNull(Box::new(clone_with_replacement(
nested_expr,
replacement_fn,
)?))),
Expr::IsTrue(nested_expr) => Ok(Expr::IsTrue(Box::new(clone_with_replacement(
nested_expr,
replacement_fn,
)?))),
Expr::IsFalse(nested_expr) => Ok(Expr::IsFalse(Box::new(clone_with_replacement(
nested_expr,
replacement_fn,
)?))),
Expr::IsUnknown(nested_expr) => Ok(Expr::IsUnknown(Box::new(
clone_with_replacement(nested_expr, replacement_fn)?,
))),
Expr::IsNotTrue(nested_expr) => Ok(Expr::IsNotTrue(Box::new(
clone_with_replacement(nested_expr, replacement_fn)?,
))),
Expr::IsNotFalse(nested_expr) => Ok(Expr::IsNotFalse(Box::new(
clone_with_replacement(nested_expr, replacement_fn)?,
))),
Expr::IsNotUnknown(nested_expr) => Ok(Expr::IsNotUnknown(Box::new(
clone_with_replacement(nested_expr, replacement_fn)?,
))),
Expr::Cast(Cast { expr, data_type }) => Ok(Expr::Cast(Cast::new(
Box::new(clone_with_replacement(expr, replacement_fn)?),
data_type.clone(),
))),
Expr::TryCast(TryCast {
expr: nested_expr,
data_type,
}) => Ok(Expr::TryCast(TryCast::new(
Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
data_type.clone(),
))),
Expr::Sort(Sort {
expr: nested_expr,
asc,
nulls_first,
}) => Ok(Expr::Sort(Sort::new(
Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
*asc,
*nulls_first,
))),
Expr::Column { .. }
| Expr::OuterReferenceColumn(_, _)
| Expr::Literal(_)
| Expr::ScalarVariable(_, _)
| Expr::Exists { .. }
| Expr::ScalarSubquery(_) => Ok(expr.clone()),
Expr::InSubquery {
expr: nested_expr,
subquery,
negated,
} => Ok(Expr::InSubquery {
expr: Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
subquery: subquery.clone(),
negated: *negated,
}),
Expr::Wildcard => Ok(Expr::Wildcard),
Expr::QualifiedWildcard { .. } => Ok(expr.clone()),
Expr::GetIndexedField(GetIndexedField { key, expr }) => {
Ok(Expr::GetIndexedField(GetIndexedField::new(
Box::new(clone_with_replacement(expr.as_ref(), replacement_fn)?),
key.clone(),
)))
}
Expr::GroupingSet(set) => match set {
GroupingSet::Rollup(exprs) => Ok(Expr::GroupingSet(GroupingSet::Rollup(
exprs
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
))),
GroupingSet::Cube(exprs) => Ok(Expr::GroupingSet(GroupingSet::Cube(
exprs
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
))),
GroupingSet::GroupingSets(lists_of_exprs) => {
let mut new_lists_of_exprs = vec![];
for exprs in lists_of_exprs {
new_lists_of_exprs.push(
exprs
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
);
}
Ok(Expr::GroupingSet(GroupingSet::GroupingSets(
new_lists_of_exprs,
)))
}
},
Expr::Placeholder { id, data_type } => Ok(Expr::Placeholder {
id: id.clone(),
data_type: data_type.clone(),
}),
}
}
}
}

View File

@ -282,6 +282,12 @@ impl Schema {
)
}
/// Return the InfluxDB data model type, if any, and underlying arrow
/// schema field for the column identified by `name`.
pub fn field_by_name(&self, name: &str) -> Option<(InfluxColumnType, &ArrowField)> {
self.find_index_of(name).map(|index| self.field(index))
}
/// Find the index of the column with the given name, if any.
pub fn find_index_of(&self, name: &str) -> Option<usize> {
self.inner.index_of(name).ok()

View File

@ -11,6 +11,7 @@ use influxdb_iox_client::{
};
use mutable_batch_lp::lines_to_batches;
use mutable_batch_pb::encode::encode_write;
use std::fmt::Display;
use tonic::IntoRequest;
/// Writes the line protocol to the write_base/api/v2/write endpoint (typically on the router)
@ -129,11 +130,11 @@ pub async fn run_sql(
///
/// Use [`try_run_influxql`] if you want to check the error manually.
pub async fn run_influxql(
influxql: impl Into<String>,
influxql: impl Into<String> + Clone + Display,
namespace: impl Into<String>,
querier_connection: Connection,
) -> Vec<RecordBatch> {
try_run_influxql(influxql, namespace, querier_connection)
try_run_influxql(influxql.clone(), namespace, querier_connection)
.await
.expect("Error executing influxql query")
.unwrap_or_else(|_| panic!("Error executing InfluxQL query: {influxql}"))
}

View File

@ -1,12 +1,14 @@
mod queries;
use crate::{run_influxql, run_sql, snapshot_comparison::queries::TestQueries, MiniCluster};
use crate::{run_sql, snapshot_comparison::queries::TestQueries, try_run_influxql, MiniCluster};
use arrow_flight::error::FlightError;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
fmt::{Display, Formatter},
fs,
path::{Path, PathBuf},
};
use tonic::Code;
use self::queries::Query;
@ -227,12 +229,21 @@ async fn run_query(
.await
}
Language::InfluxQL => {
run_influxql(
match try_run_influxql(
query_text,
cluster.namespace(),
cluster.querier().querier_grpc_connection(),
)
.await
{
Ok(results) => results,
Err(influxdb_iox_client::flight::Error::ArrowFlightError(FlightError::Tonic(
status,
))) if status.code() == Code::InvalidArgument => {
return Ok(vec![status.message().to_owned()])
}
Err(err) => return Ok(vec![err.to_string()]),
}
}
};