Merge pull request #7753 from influxdata/sgc/issue/7739_select_ir_02

refactor: Use intermediate representation of `SELECT` in InfluxQL planner
pull/24376/head
kodiakhq[bot] 2023-05-05 06:06:09 +00:00 committed by GitHub
commit 2ba5fb4759
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 588 additions and 564 deletions

View File

@ -45,7 +45,7 @@ pub struct SelectStatement {
pub group_by: Option<GroupByClause>, pub group_by: Option<GroupByClause>,
/// The [fill] clause specifies the fill behaviour for the selection. If the value is [`None`], /// The [fill] clause specifies the fill behaviour for the selection. If the value is [`None`],
/// it is the same behavior as `fill(none)`. /// it is the same behavior as `fill(null)`.
/// ///
/// [fill]: https://docs.influxdata.com/influxdb/v1.8/query_language/explore-data/#group-by-time-intervals-and-fill /// [fill]: https://docs.influxdata.com/influxdb/v1.8/query_language/explore-data/#group-by-time-intervals-and-fill
pub fill: Option<FillClause>, pub fill: Option<FillClause>,

View File

@ -1003,11 +1003,11 @@ tags: non_existent=
+---------------------+-------+--------------------+--------------------+ +---------------------+-------+--------------------+--------------------+
-- InfluxQL: SELECT COUNT(f64), COUNT(f64) + COUNT(f64), COUNT(f64) * 3 FROM m0; -- InfluxQL: SELECT COUNT(f64), COUNT(f64) + COUNT(f64), COUNT(f64) * 3 FROM m0;
name: m0 name: m0
+---------------------+-------+---------------------+-----------+ +---------------------+-------+-------------+---------+
| time | count | count_f64_count_f64 | count_f64 | | time | count | count_count | count_1 |
+---------------------+-------+---------------------+-----------+ +---------------------+-------+-------------+---------+
| 1970-01-01T00:00:00 | 7 | 14 | 21 | | 1970-01-01T00:00:00 | 7 | 14 | 21 |
+---------------------+-------+---------------------+-----------+ +---------------------+-------+-------------+---------+
-- InfluxQL: SELECT COUNT(f64) as the_count, SUM(non_existent) as foo FROM m0; -- InfluxQL: SELECT COUNT(f64) as the_count, SUM(non_existent) as foo FROM m0;
name: m0 name: m0
+---------------------+-----------+-----+ +---------------------+-----------+-----+
@ -1052,31 +1052,31 @@ name: disk
+---------------------+------+--------+ +---------------------+------+--------+
-- InfluxQL: SELECT MEAN(usage_idle) + MEAN(bytes_free) FROM cpu, disk; -- InfluxQL: SELECT MEAN(usage_idle) + MEAN(bytes_free) FROM cpu, disk;
name: cpu name: cpu
+---------------------+---------------------------------+ +---------------------+-----------+
| time | mean_usage_idle_mean_bytes_free | | time | mean_mean |
+---------------------+---------------------------------+ +---------------------+-----------+
| 1970-01-01T00:00:00 | | | 1970-01-01T00:00:00 | |
+---------------------+---------------------------------+ +---------------------+-----------+
name: disk name: disk
+---------------------+---------------------------------+ +---------------------+-----------+
| time | mean_usage_idle_mean_bytes_free | | time | mean_mean |
+---------------------+---------------------------------+ +---------------------+-----------+
| 1970-01-01T00:00:00 | | | 1970-01-01T00:00:00 | |
+---------------------+---------------------------------+ +---------------------+-----------+
-- InfluxQL: SELECT MEAN(usage_idle) + MEAN(foo) FROM cpu; -- InfluxQL: SELECT MEAN(usage_idle) + MEAN(foo) FROM cpu;
name: cpu name: cpu
+---------------------+--------------------------+ +---------------------+-----------+
| time | mean_usage_idle_mean_foo | | time | mean_mean |
+---------------------+--------------------------+ +---------------------+-----------+
| 1970-01-01T00:00:00 | | | 1970-01-01T00:00:00 | |
+---------------------+--------------------------+ +---------------------+-----------+
-- InfluxQL: SELECT MEAN(usage_idle), MEAN(usage_idle) + MEAN(foo) FROM cpu; -- InfluxQL: SELECT MEAN(usage_idle), MEAN(usage_idle) + MEAN(foo) FROM cpu;
name: cpu name: cpu
+---------------------+--------------------+--------------------------+ +---------------------+--------------------+-----------+
| time | mean | mean_usage_idle_mean_foo | | time | mean | mean_mean |
+---------------------+--------------------+--------------------------+ +---------------------+--------------------+-----------+
| 1970-01-01T00:00:00 | 1.9850000000000003 | | | 1970-01-01T00:00:00 | 1.9850000000000003 | |
+---------------------+--------------------+--------------------------+ +---------------------+--------------------+-----------+
-- InfluxQL: SELECT MEAN(foo) FROM cpu; -- InfluxQL: SELECT MEAN(foo) FROM cpu;
++ ++
++ ++
@ -1084,47 +1084,47 @@ name: cpu
-- InfluxQL: SELECT MEAN(usage_idle) + MEAN(foo) FROM cpu GROUP BY cpu; -- InfluxQL: SELECT MEAN(usage_idle) + MEAN(foo) FROM cpu GROUP BY cpu;
name: cpu name: cpu
tags: cpu=cpu-total tags: cpu=cpu-total
+---------------------+--------------------------+ +---------------------+-----------+
| time | mean_usage_idle_mean_foo | | time | mean_mean |
+---------------------+--------------------------+ +---------------------+-----------+
| 1970-01-01T00:00:00 | | | 1970-01-01T00:00:00 | |
+---------------------+--------------------------+ +---------------------+-----------+
name: cpu name: cpu
tags: cpu=cpu0 tags: cpu=cpu0
+---------------------+--------------------------+ +---------------------+-----------+
| time | mean_usage_idle_mean_foo | | time | mean_mean |
+---------------------+--------------------------+ +---------------------+-----------+
| 1970-01-01T00:00:00 | | | 1970-01-01T00:00:00 | |
+---------------------+--------------------------+ +---------------------+-----------+
name: cpu name: cpu
tags: cpu=cpu1 tags: cpu=cpu1
+---------------------+--------------------------+ +---------------------+-----------+
| time | mean_usage_idle_mean_foo | | time | mean_mean |
+---------------------+--------------------------+ +---------------------+-----------+
| 1970-01-01T00:00:00 | | | 1970-01-01T00:00:00 | |
+---------------------+--------------------------+ +---------------------+-----------+
-- InfluxQL: SELECT MEAN(usage_idle), MEAN(usage_idle) + MEAN(foo) FROM cpu GROUP BY cpu; -- InfluxQL: SELECT MEAN(usage_idle), MEAN(usage_idle) + MEAN(foo) FROM cpu GROUP BY cpu;
name: cpu name: cpu
tags: cpu=cpu-total tags: cpu=cpu-total
+---------------------+--------------------+--------------------------+ +---------------------+--------------------+-----------+
| time | mean | mean_usage_idle_mean_foo | | time | mean | mean_mean |
+---------------------+--------------------+--------------------------+ +---------------------+--------------------+-----------+
| 1970-01-01T00:00:00 | 2.9850000000000003 | | | 1970-01-01T00:00:00 | 2.9850000000000003 | |
+---------------------+--------------------+--------------------------+ +---------------------+--------------------+-----------+
name: cpu name: cpu
tags: cpu=cpu0 tags: cpu=cpu0
+---------------------+-------+--------------------------+ +---------------------+-------+-----------+
| time | mean | mean_usage_idle_mean_foo | | time | mean | mean_mean |
+---------------------+-------+--------------------------+ +---------------------+-------+-----------+
| 1970-01-01T00:00:00 | 0.985 | | | 1970-01-01T00:00:00 | 0.985 | |
+---------------------+-------+--------------------------+ +---------------------+-------+-----------+
name: cpu name: cpu
tags: cpu=cpu1 tags: cpu=cpu1
+---------------------+--------------------+--------------------------+ +---------------------+--------------------+-----------+
| time | mean | mean_usage_idle_mean_foo | | time | mean | mean_mean |
+---------------------+--------------------+--------------------------+ +---------------------+--------------------+-----------+
| 1970-01-01T00:00:00 | 1.9849999999999999 | | | 1970-01-01T00:00:00 | 1.9849999999999999 | |
+---------------------+--------------------+--------------------------+ +---------------------+--------------------+-----------+
-- InfluxQL: SELECT MEAN(foo) FROM cpu GROUP BY cpu; -- InfluxQL: SELECT MEAN(foo) FROM cpu GROUP BY cpu;
++ ++
++ ++

View File

@ -1,6 +1,6 @@
use crate::plan::field::field_by_name; use crate::plan::field::field_by_name;
use crate::plan::field_mapper::map_type; use crate::plan::field_mapper::map_type;
use crate::plan::ir::TableReference; use crate::plan::ir::DataSource;
use crate::plan::{error, SchemaProvider}; use crate::plan::{error, SchemaProvider};
use datafusion::common::Result; use datafusion::common::Result;
use influxdb_influxql_parser::expression::{ use influxdb_influxql_parser::expression::{
@ -16,22 +16,25 @@ use itertools::Itertools;
pub(super) fn evaluate_type( pub(super) fn evaluate_type(
s: &dyn SchemaProvider, s: &dyn SchemaProvider,
expr: &Expr, expr: &Expr,
from: &[TableReference], from: &[DataSource],
) -> Result<Option<VarRefDataType>> { ) -> Result<Option<VarRefDataType>> {
TypeEvaluator::new(from, s).eval_type(expr) TypeEvaluator::new(s, from).eval_type(expr)
} }
struct TypeEvaluator<'a> { /// Evaluate the type of the specified expression.
///
/// Derived from [Go implementation](https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4796-L4797).
pub(super) struct TypeEvaluator<'a> {
s: &'a dyn SchemaProvider, s: &'a dyn SchemaProvider,
from: &'a [TableReference], from: &'a [DataSource],
} }
impl<'a> TypeEvaluator<'a> { impl<'a> TypeEvaluator<'a> {
fn new(from: &'a [TableReference], s: &'a dyn SchemaProvider) -> Self { pub(super) fn new(s: &'a dyn SchemaProvider, from: &'a [DataSource]) -> Self {
Self { from, s } Self { from, s }
} }
fn eval_type(&self, expr: &Expr) -> Result<Option<VarRefDataType>> { pub(super) fn eval_type(&self, expr: &Expr) -> Result<Option<VarRefDataType>> {
Ok(match expr { Ok(match expr {
Expr::VarRef(v) => self.eval_var_ref(v)?, Expr::VarRef(v) => self.eval_var_ref(v)?,
Expr::Call(v) => self.eval_call(v)?, Expr::Call(v) => self.eval_call(v)?,
@ -80,9 +83,10 @@ impl<'a> TypeEvaluator<'a> {
} }
} }
/// Returns the type for the specified [`Expr`]. /// Returns the type for the specified [`VarRef`].
///
/// This function assumes that the expression has already been reduced. /// This function assumes that the expression has already been reduced.
fn eval_var_ref(&self, expr: &VarRef) -> Result<Option<VarRefDataType>> { pub(super) fn eval_var_ref(&self, expr: &VarRef) -> Result<Option<VarRefDataType>> {
Ok(match expr.data_type { Ok(match expr.data_type {
Some(dt) Some(dt)
if matches!( if matches!(
@ -100,7 +104,7 @@ impl<'a> TypeEvaluator<'a> {
let mut data_type: Option<VarRefDataType> = None; let mut data_type: Option<VarRefDataType> = None;
for tr in self.from.iter() { for tr in self.from.iter() {
match tr { match tr {
TableReference::Name(name) => match ( DataSource::Table(name) => match (
data_type, data_type,
map_type(self.s, name.as_str(), expr.name.as_str())?, map_type(self.s, name.as_str(), expr.name.as_str())?,
) { ) {
@ -112,7 +116,7 @@ impl<'a> TypeEvaluator<'a> {
(None, Some(res)) => data_type = Some(res), (None, Some(res)) => data_type = Some(res),
_ => continue, _ => continue,
}, },
TableReference::Subquery(select) => { DataSource::Subquery(select) => {
// find the field by name // find the field by name
if let Some(field) = field_by_name(&select.fields, expr.name.as_str()) { if let Some(field) = field_by_name(&select.fields, expr.name.as_str()) {
match (data_type, evaluate_type(self.s, &field.expr, &select.from)?) match (data_type, evaluate_type(self.s, &field.expr, &select.from)?)

View File

@ -66,7 +66,7 @@ impl<'a> Visitor for BinaryExprNameVisitor<'a> {
fn pre_visit_call(self, n: &Call) -> Result<Recursion<Self>, Self::Error> { fn pre_visit_call(self, n: &Call) -> Result<Recursion<Self>, Self::Error> {
self.0.push(n.name.clone()); self.0.push(n.name.clone());
Ok(Recursion::Continue(self)) Ok(Recursion::Stop(self))
} }
} }
@ -102,7 +102,7 @@ mod test {
assert_eq!(field_name(&f), "count"); assert_eq!(field_name(&f), "count");
let f = get_first_field("SELECT COUNT(usage) + SUM(usage_idle) FROM cpu"); let f = get_first_field("SELECT COUNT(usage) + SUM(usage_idle) FROM cpu");
assert_eq!(field_name(&f), "count_usage_sum_usage_idle"); assert_eq!(field_name(&f), "count_sum");
let f = get_first_field("SELECT 1+2 FROM cpu"); let f = get_first_field("SELECT 1+2 FROM cpu");
assert_eq!(field_name(&f), ""); assert_eq!(field_name(&f), "");

View File

@ -5,7 +5,6 @@ use influxdb_influxql_parser::common::{
LimitClause, MeasurementName, OffsetClause, OrderByClause, QualifiedMeasurementName, LimitClause, MeasurementName, OffsetClause, OrderByClause, QualifiedMeasurementName,
WhereClause, WhereClause,
}; };
use influxdb_influxql_parser::expression::ConditionalExpression;
use influxdb_influxql_parser::select::{ use influxdb_influxql_parser::select::{
Field, FieldList, FillClause, FromMeasurementClause, GroupByClause, MeasurementSelection, Field, FieldList, FillClause, FromMeasurementClause, GroupByClause, MeasurementSelection,
SelectStatement, TimeZoneClause, SelectStatement, TimeZoneClause,
@ -19,17 +18,17 @@ pub(super) struct Select {
/// Projection clause of the selection. /// Projection clause of the selection.
pub(super) fields: Vec<Field>, pub(super) fields: Vec<Field>,
/// A list of tables or subqueries used as the source data for the selection. /// A list of data sources for the selection.
pub(super) from: Vec<TableReference>, pub(super) from: Vec<DataSource>,
/// A conditional expression to filter the selection. /// A conditional expression to filter the selection.
pub(super) condition: Option<ConditionalExpression>, pub(super) condition: Option<WhereClause>,
/// The GROUP BY clause of the selection. /// The GROUP BY clause of the selection.
pub(super) group_by: Option<GroupByClause>, pub(super) group_by: Option<GroupByClause>,
/// The [fill] clause specifies the fill behaviour for the selection. If the value is [`None`], /// The [fill] clause specifies the fill behaviour for the selection. If the value is [`None`],
/// it is the same behavior as `fill(none)`. /// it is the same behavior as `fill(null)`.
/// ///
/// [fill]: https://docs.influxdata.com/influxdb/v1.8/query_language/explore-data/#group-by-time-intervals-and-fill /// [fill]: https://docs.influxdata.com/influxdb/v1.8/query_language/explore-data/#group-by-time-intervals-and-fill
pub(super) fill: Option<FillClause>, pub(super) fill: Option<FillClause>,
@ -38,10 +37,10 @@ pub(super) struct Select {
pub(super) order_by: Option<OrderByClause>, pub(super) order_by: Option<OrderByClause>,
/// A value to restrict the number of rows returned. /// A value to restrict the number of rows returned.
pub(super) limit: Option<u64>, pub(super) limit: Option<LimitClause>,
/// A value to specify an offset to start retrieving rows. /// A value to specify an offset to start retrieving rows.
pub(super) offset: Option<u64>, pub(super) offset: Option<OffsetClause>,
/// The timezone for the query, specified as [`tz('<time zone>')`][time_zone_clause]. /// The timezone for the query, specified as [`tz('<time zone>')`][time_zone_clause].
/// ///
@ -58,25 +57,25 @@ impl From<Select> for SelectStatement {
.from .from
.into_iter() .into_iter()
.map(|tr| match tr { .map(|tr| match tr {
TableReference::Name(name) => { DataSource::Table(name) => {
MeasurementSelection::Name(QualifiedMeasurementName { MeasurementSelection::Name(QualifiedMeasurementName {
database: None, database: None,
retention_policy: None, retention_policy: None,
name: MeasurementName::Name(name.as_str().into()), name: MeasurementName::Name(name.as_str().into()),
}) })
} }
TableReference::Subquery(q) => { DataSource::Subquery(q) => {
MeasurementSelection::Subquery(Box::new((*q).into())) MeasurementSelection::Subquery(Box::new((*q).into()))
} }
}) })
.collect(), .collect(),
), ),
condition: value.condition.map(WhereClause::new), condition: value.condition,
group_by: value.group_by, group_by: value.group_by,
fill: value.fill, fill: value.fill,
order_by: value.order_by, order_by: value.order_by,
limit: value.limit.map(LimitClause::new), limit: value.limit,
offset: value.offset.map(OffsetClause::new), offset: value.offset,
series_limit: None, series_limit: None,
series_offset: None, series_offset: None,
timezone: value.timezone.map(TimeZoneClause::new), timezone: value.timezone.map(TimeZoneClause::new),
@ -84,9 +83,9 @@ impl From<Select> for SelectStatement {
} }
} }
/// Represents a concrete reference to a table in a [`Select`] from clause. /// Represents a data source that is either a table or a subquery in a [`Select`] from clause.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(super) enum TableReference { pub(super) enum DataSource {
Name(String), Table(String),
Subquery(Box<Select>), Subquery(Box<Select>),
} }

View File

@ -1,5 +1,6 @@
mod select; mod select;
use crate::plan::ir::{DataSource, Select};
use crate::plan::planner::select::{ use crate::plan::planner::select::{
check_exprs_satisfy_columns, fields_to_exprs_no_nulls, make_tag_key_column_meta, plan_with_sort, check_exprs_satisfy_columns, fields_to_exprs_no_nulls, make_tag_key_column_meta, plan_with_sort,
}; };
@ -43,9 +44,7 @@ use influxdb_influxql_parser::expression::{
use influxdb_influxql_parser::functions::{ use influxdb_influxql_parser::functions::{
is_aggregate_function, is_now_function, is_scalar_math_function, is_aggregate_function, is_now_function, is_scalar_math_function,
}; };
use influxdb_influxql_parser::select::{ use influxdb_influxql_parser::select::{FillClause, GroupByClause};
FillClause, GroupByClause, SLimitClause, SOffsetClause, TimeZoneClause,
};
use influxdb_influxql_parser::show_field_keys::ShowFieldKeysStatement; use influxdb_influxql_parser::show_field_keys::ShowFieldKeysStatement;
use influxdb_influxql_parser::show_measurements::{ use influxdb_influxql_parser::show_measurements::{
ShowMeasurementsStatement, WithMeasurementClause, ShowMeasurementsStatement, WithMeasurementClause,
@ -56,9 +55,8 @@ use influxdb_influxql_parser::simple_from_clause::ShowFromClause;
use influxdb_influxql_parser::{ use influxdb_influxql_parser::{
common::{MeasurementName, WhereClause}, common::{MeasurementName, WhereClause},
expression::Expr as IQLExpr, expression::Expr as IQLExpr,
identifier::Identifier,
literal::Literal, literal::Literal,
select::{Field, FromMeasurementClause, MeasurementSelection, SelectStatement}, select::{Field, SelectStatement},
statement::Statement, statement::Statement,
}; };
use iox_query::config::{IoxConfigExt, MetadataCutoff}; use iox_query::config::{IoxConfigExt, MetadataCutoff};
@ -75,7 +73,7 @@ use schema::{
InfluxColumnType, InfluxFieldType, Schema, INFLUXQL_MEASUREMENT_COLUMN_NAME, InfluxColumnType, InfluxFieldType, Schema, INFLUXQL_MEASUREMENT_COLUMN_NAME,
INFLUXQL_METADATA_KEY, INFLUXQL_METADATA_KEY,
}; };
use std::collections::{HashSet, VecDeque}; use std::collections::HashSet;
use std::fmt::Debug; use std::fmt::Debug;
use std::iter; use std::iter;
use std::ops::{Bound, ControlFlow, Deref, Range}; use std::ops::{Bound, ControlFlow, Deref, Range};
@ -155,12 +153,11 @@ impl<'a> Context<'a> {
Self { scope, ..*self } Self { scope, ..*self }
} }
fn with_timezone(&self, timezone: Option<TimeZoneClause>) -> Self { fn with_timezone(&self, tz: Option<Tz>) -> Self {
let tz = timezone.as_deref().cloned();
Self { tz, ..*self } Self { tz, ..*self }
} }
fn with_group_by_fill(&self, select: &'a SelectStatement) -> Self { fn with_group_by_fill(&self, select: &'a Select) -> Self {
Self { Self {
group_by: select.group_by.as_ref(), group_by: select.group_by.as_ref(),
fill: select.fill, fill: select.fill,
@ -269,14 +266,12 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
) )
} }
fn rewrite_select_statement(&self, select: SelectStatement) -> Result<SelectStatement> { fn rewrite_select_statement(&self, select: SelectStatement) -> Result<Select> {
rewrite_statement(self.s, &select) rewrite_statement(self.s, &select)
} }
/// Create a [`LogicalPlan`] from the specified InfluxQL `SELECT` statement. /// Create a [`LogicalPlan`] from the specified InfluxQL `SELECT` statement.
fn select_statement_to_plan(&self, select: &SelectStatement) -> Result<LogicalPlan> { fn select_statement_to_plan(&self, select: &Select) -> Result<LogicalPlan> {
let mut plans = self.plan_from_tables(&select.from)?;
let ctx = Context::new(select_statement_info(select)?) let ctx = Context::new(select_statement_info(select)?)
.with_timezone(select.timezone) .with_timezone(select.timezone)
.with_group_by_fill(select); .with_group_by_fill(select);
@ -341,65 +336,37 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
fields.extend(fields_no_time.iter().cloned()); fields.extend(fields_no_time.iter().cloned());
// Build the first non-empty plan
let plan = { let plan = {
loop { let mut iter = select.from.iter();
match plans.pop_front() { let plan = match iter.next() {
Some((plan, proj)) => match self.project_select( Some(ds) => self.project_select(&ctx, ds, select, &fields, &group_by_tag_set),
&ctx, None => {
// empty result, but let's at least have all the strictly necessary metadata
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
INFLUXQL_MEASUREMENT_COLUMN_NAME,
(&InfluxColumnType::Tag).into(),
false,
)]));
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: schema.to_dfschema_ref()?,
});
let plan = plan_with_metadata(
plan, plan,
proj, &InfluxQlMetadata {
select, measurement_column_index: MEASUREMENT_COLUMN_INDEX,
&fields, tag_key_columns: vec![],
&group_by_tag_set, },
)? { )?;
// Exclude any plans that produce no data, which is return Ok(plan);
// consistent with InfluxQL.
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
..
}) => continue,
plan => break plan,
},
None => {
// empty result, but let's at least have all the strictly necessary metadata
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
INFLUXQL_MEASUREMENT_COLUMN_NAME,
(&InfluxColumnType::Tag).into(),
false,
)]));
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: schema.to_dfschema_ref()?,
});
let plan = plan_with_metadata(
plan,
&InfluxQlMetadata {
measurement_column_index: MEASUREMENT_COLUMN_INDEX,
tag_key_columns: vec![],
},
)?;
return Ok(plan);
}
} }
} }?;
};
// UNION the remaining plans iter.try_fold(plan, |prev, ds| {
let plan = plans.into_iter().try_fold(plan, |prev, (next, proj)| { let next = self.project_select(&ctx, ds, select, &fields, &group_by_tag_set)?;
let next = self.project_select(&ctx, next, proj, select, &fields, &group_by_tag_set)?;
if let LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
..
}) = next
{
// Exclude any plans that produce no data, which is
// consistent with InfluxQL.
Ok(prev)
} else {
LogicalPlanBuilder::from(prev).union(next)?.build() LogicalPlanBuilder::from(prev).union(next)?.build()
} })?
})?; };
let plan = plan_with_metadata( let plan = plan_with_metadata(
plan, plan,
@ -451,23 +418,22 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
&projection_tag_set, &projection_tag_set,
)?; )?;
let plan = self.slimit(plan, select.series_offset, select.series_limit)?;
Ok(plan) Ok(plan)
} }
fn project_select( fn project_select(
&self, &self,
ctx: &Context<'_>, ctx: &Context<'_>,
input: LogicalPlan, ds: &DataSource,
proj: Vec<Expr>, select: &Select,
select: &SelectStatement,
fields: &[Field], fields: &[Field],
group_by_tag_set: &[&str], group_by_tag_set: &[&str],
) -> Result<LogicalPlan> { ) -> Result<LogicalPlan> {
let schemas = Schemas::new(input.schema())?; let (plan, proj) = self.plan_from_data_source(ds)?;
let plan = self.plan_where_clause(ctx, &select.condition, input, &schemas)?; let schemas = Schemas::new(plan.schema())?;
let plan = self.plan_where_clause(ctx, &select.condition, plan, &schemas)?;
// Transform InfluxQL AST field expressions to a list of DataFusion expressions. // Transform InfluxQL AST field expressions to a list of DataFusion expressions.
let mut select_exprs = self.field_list_to_exprs(ctx, &plan, fields, &schemas)?; let mut select_exprs = self.field_list_to_exprs(ctx, &plan, fields, &schemas)?;
@ -864,27 +830,6 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
} }
} }
/// Verifies the `SLIMIT` and `SOFFSET` clauses are `None`; otherwise, return a
/// `NotImplemented` error.
///
/// ## Why?
/// * `SLIMIT` and `SOFFSET` don't work as expected per issue [#7571]
/// * This issue [is noted](https://docs.influxdata.com/influxdb/v1.8/query_language/explore-data/#the-slimit-clause) in our official documentation
///
/// [#7571]: https://github.com/influxdata/influxdb/issues/7571
fn slimit(
&self,
input: LogicalPlan,
offset: Option<SOffsetClause>,
limit: Option<SLimitClause>,
) -> Result<LogicalPlan> {
if offset.is_none() && limit.is_none() {
return Ok(input);
}
error::not_implemented("SLIMIT or SOFFSET")
}
/// Map the InfluxQL `SELECT` projection list into a list of DataFusion expressions. /// Map the InfluxQL `SELECT` projection list into a list of DataFusion expressions.
fn field_list_to_exprs( fn field_list_to_exprs(
&self, &self,
@ -1001,8 +946,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
name, name,
data_type: opt_dst_type, data_type: opt_dst_type,
}) => { }) => {
let name = normalize_identifier(name); Ok(match (ctx.scope, name.deref().as_str()) {
Ok(match (ctx.scope, name.as_str()) {
// Per the Go implementation, the time column is case-insensitive in the // Per the Go implementation, the time column is case-insensitive in the
// `WHERE` clause and disregards any postfix type cast operator. // `WHERE` clause and disregards any postfix type cast operator.
// //
@ -1261,42 +1205,31 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
/// Generate a list of logical plans for each of the tables references in the `FROM` /// Generate a list of logical plans for each of the tables references in the `FROM`
/// clause. /// clause.
fn plan_from_tables( fn plan_from_data_source(&self, ds: &DataSource) -> Result<(LogicalPlan, Vec<Expr>)> {
&self, match ds {
from: &FromMeasurementClause, DataSource::Table(table_name) => {
) -> Result<VecDeque<(LogicalPlan, Vec<Expr>)>> { // `rewrite_statement` guarantees the table should exist
// A list of scans and their initial projections let source = self.s.get_table_provider(table_name)?;
let mut table_projs = VecDeque::new(); let table_ref = TableReference::bare(table_name.to_owned());
for ms in from.iter() { Ok((
let Some(table_proj) = match ms { LogicalPlanBuilder::scan(table_ref, source, None)?.build()?,
MeasurementSelection::Name(qn) => match qn.name { vec![lit_dict(table_name).alias(INFLUXQL_MEASUREMENT_COLUMN_NAME)],
MeasurementName::Name(ref ident) => { ))
self.create_table_ref(normalize_identifier(ident)) }
} DataSource::Subquery(_) => error::not_implemented("subquery in FROM clause"),
// rewriter is expected to expand the regular expression
MeasurementName::Regex(_) => error::internal(
"unexpected regular expression in FROM clause",
),
},
MeasurementSelection::Subquery(_) => error::not_implemented(
"subquery in FROM clause",
),
}? else { continue };
table_projs.push_back(table_proj);
} }
Ok(table_projs)
} }
/// Create a [LogicalPlan] that refers to the specified `table_name`. /// Create a [LogicalPlan] that refers to the specified `table_name`.
/// ///
/// Normally, this functions will not return a `None`, as tables have been matched] /// Normally, this functions will not return a `None`, as tables have been matched]
/// by the [`rewrite_statement`] function. /// by the [`rewrite_statement`] function.
fn create_table_ref(&self, table_name: String) -> Result<Option<(LogicalPlan, Vec<Expr>)>> { fn create_table_ref(&self, table_name: &str) -> Result<Option<(LogicalPlan, Vec<Expr>)>> {
Ok(if let Ok(source) = self.s.get_table_provider(&table_name) { Ok(if let Ok(source) = self.s.get_table_provider(table_name) {
let table_ref = TableReference::bare(table_name.to_string()); let table_ref = TableReference::bare(table_name.to_owned());
Some(( Some((
LogicalPlanBuilder::scan(table_ref, source, None)?.build()?, LogicalPlanBuilder::scan(table_ref, source, None)?.build()?,
vec![lit_dict(&table_name).alias(INFLUXQL_MEASUREMENT_COLUMN_NAME)], vec![lit_dict(table_name).alias(INFLUXQL_MEASUREMENT_COLUMN_NAME)],
)) ))
} else { } else {
None None
@ -1436,7 +1369,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
let mut union_plan = None; let mut union_plan = None;
for table in tables { for table in tables {
let Some(table_schema) = self.s.table_schema(&table) else {continue}; let Some(table_schema) = self.s.table_schema(&table) else {continue};
let Some((plan, measurement_expr)) = self.create_table_ref(table.clone())? else {continue;}; let Some((plan, measurement_expr)) = self.create_table_ref(&table)? else {continue;};
let schemas = Schemas::new(plan.schema())?; let schemas = Schemas::new(plan.schema())?;
let plan = let plan =
@ -1683,7 +1616,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
continue; continue;
} }
let Some((plan, measurement_expr)) = self.create_table_ref(table)? else {continue;}; let Some((plan, measurement_expr)) = self.create_table_ref(&table)? else {continue;};
let schemas = Schemas::new(plan.schema())?; let schemas = Schemas::new(plan.schema())?;
let plan = self.plan_where_clause( let plan = self.plan_where_clause(
@ -1787,7 +1720,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
let mut union_plan = None; let mut union_plan = None;
for table in tables { for table in tables {
let Some((plan, _measurement_expr)) = self.create_table_ref(table.clone())? else {continue;}; let Some((plan, _measurement_expr)) = self.create_table_ref(&table)? else {continue;};
let schemas = Schemas::new(plan.schema())?; let schemas = Schemas::new(plan.schema())?;
let plan = let plan =
@ -2128,13 +2061,6 @@ fn conditional_op_to_operator(op: ConditionalOperator) -> Result<Operator> {
} }
} }
// Normalize an identifier. Identifiers in InfluxQL are case sensitive,
// and therefore not transformed to lower case.
fn normalize_identifier(ident: &Identifier) -> String {
// Dereference the identifier to return the unquoted value.
ident.deref().clone()
}
/// Find the index of the time column in the fields list. /// Find the index of the time column in the fields list.
/// ///
/// > **Note** /// > **Note**
@ -3264,11 +3190,11 @@ mod test {
// The `COUNT(f64_field)` aggregate is only projected ones in the Aggregate and reused in the projection // The `COUNT(f64_field)` aggregate is only projected ones in the Aggregate and reused in the projection
assert_snapshot!(plan("SELECT COUNT(f64_field), COUNT(f64_field) + COUNT(f64_field), COUNT(f64_field) * 3 FROM data"), @r###" assert_snapshot!(plan("SELECT COUNT(f64_field), COUNT(f64_field) + COUNT(f64_field), COUNT(f64_field) * 3 FROM data"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N, count_f64_field_count_f64_field:Int64;N, count_f64_field:Int64;N] Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N, count_count:Int64;N, count_1:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, COUNT(data.f64_field) AS count, COUNT(data.f64_field) + COUNT(data.f64_field) AS count_f64_field_count_f64_field, COUNT(data.f64_field) * Int64(3) AS count_f64_field [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N, count_f64_field_count_f64_field:Int64;N, count_f64_field:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, COUNT(data.f64_field) AS count, COUNT(data.f64_field) + COUNT(data.f64_field) AS count_count, COUNT(data.f64_field) * Int64(3) AS count_1 [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N, count_count:Int64;N, count_1:Int64;N]
Aggregate: groupBy=[[]], aggr=[[COUNT(data.f64_field)]] [COUNT(data.f64_field):Int64;N] Aggregate: groupBy=[[]], aggr=[[COUNT(data.f64_field)]] [COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###); "###);
// non-existent tags are excluded from the Aggregate groupBy and Sort operators // non-existent tags are excluded from the Aggregate groupBy and Sort operators
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo, non_existent"), @r###" assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo, non_existent"), @r###"
@ -3280,31 +3206,31 @@ mod test {
// Aggregate expression is projected once and reused in final projection // Aggregate expression is projected once and reused in final projection
assert_snapshot!(plan("SELECT COUNT(f64_field), COUNT(f64_field) * 2 FROM data"), @r###" assert_snapshot!(plan("SELECT COUNT(f64_field), COUNT(f64_field) * 2 FROM data"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N, count_f64_field:Int64;N] Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N, count_1:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, COUNT(data.f64_field) AS count, COUNT(data.f64_field) * Int64(2) AS count_f64_field [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N, count_f64_field:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, COUNT(data.f64_field) AS count, COUNT(data.f64_field) * Int64(2) AS count_1 [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N, count_1:Int64;N]
Aggregate: groupBy=[[]], aggr=[[COUNT(data.f64_field)]] [COUNT(data.f64_field):Int64;N] Aggregate: groupBy=[[]], aggr=[[COUNT(data.f64_field)]] [COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###); "###);
// Aggregate expression selecting non-existent field // Aggregate expression selecting non-existent field
assert_snapshot!(plan("SELECT MEAN(f64_field) + MEAN(non_existent) FROM data"), @r###" assert_snapshot!(plan("SELECT MEAN(f64_field) + MEAN(non_existent) FROM data"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), mean_f64_field_mean_non_existent:Null;N] Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), mean_mean:Null;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, NULL AS mean_f64_field_mean_non_existent [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), mean_f64_field_mean_non_existent:Null;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, NULL AS mean_mean [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), mean_mean:Null;N]
EmptyRelation [] EmptyRelation []
"###); "###);
// Aggregate expression with GROUP BY and non-existent field // Aggregate expression with GROUP BY and non-existent field
assert_snapshot!(plan("SELECT MEAN(f64_field) + MEAN(non_existent) FROM data GROUP BY foo"), @r###" assert_snapshot!(plan("SELECT MEAN(f64_field) + MEAN(non_existent) FROM data GROUP BY foo"), @r###"
Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, mean_f64_field_mean_non_existent:Null;N] Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, mean_mean:Null;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, data.foo AS foo, NULL AS mean_f64_field_mean_non_existent [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, mean_f64_field_mean_non_existent:Null;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, data.foo AS foo, NULL AS mean_mean [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, mean_mean:Null;N]
Aggregate: groupBy=[[data.foo]], aggr=[[]] [foo:Dictionary(Int32, Utf8);N] Aggregate: groupBy=[[data.foo]], aggr=[[]] [foo:Dictionary(Int32, Utf8);N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###); "###);
// Aggregate expression selecting tag, should treat as non-existent // Aggregate expression selecting tag, should treat as non-existent
assert_snapshot!(plan("SELECT MEAN(f64_field), MEAN(f64_field) + MEAN(non_existent) FROM data"), @r###" assert_snapshot!(plan("SELECT MEAN(f64_field), MEAN(f64_field) + MEAN(non_existent) FROM data"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), mean:Float64;N, mean_f64_field_mean_non_existent:Null;N] Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), mean:Float64;N, mean_mean:Null;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, AVG(data.f64_field) AS mean, NULL AS mean_f64_field_mean_non_existent [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), mean:Float64;N, mean_f64_field_mean_non_existent:Null;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, AVG(data.f64_field) AS mean, NULL AS mean_mean [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), mean:Float64;N, mean_mean:Null;N]
Aggregate: groupBy=[[]], aggr=[[AVG(data.f64_field)]] [AVG(data.f64_field):Float64;N] Aggregate: groupBy=[[]], aggr=[[AVG(data.f64_field)]] [AVG(data.f64_field):Float64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###); "###);
@ -3421,8 +3347,8 @@ mod test {
// Aggregates as part of a binary expression // Aggregates as part of a binary expression
assert_snapshot!(plan("SELECT COUNT(f64_field) + MEAN(f64_field) FROM data GROUP BY TIME(10s) FILL(3.2)"), @r###" assert_snapshot!(plan("SELECT COUNT(f64_field) + MEAN(f64_field) FROM data GROUP BY TIME(10s) FILL(3.2)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count_f64_field_mean_f64_field:Float64;N] Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count_mean:Float64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, coalesce_struct(COUNT(data.f64_field), Int64(3)) + coalesce_struct(AVG(data.f64_field), Float64(3.2)) AS count_f64_field_mean_f64_field [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count_f64_field_mean_f64_field:Float64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, coalesce_struct(COUNT(data.f64_field), Int64(3)) + coalesce_struct(AVG(data.f64_field), Float64(3.2)) AS count_mean [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count_mean:Float64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field), AVG(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N, AVG(data.f64_field):Float64;N] GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field), AVG(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N, AVG(data.f64_field):Float64;N]
Aggregate: groupBy=[[datebin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field), AVG(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N, AVG(data.f64_field):Float64;N] Aggregate: groupBy=[[datebin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field), AVG(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N, AVG(data.f64_field):Float64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]

View File

@ -1,7 +1,7 @@
use crate::plan::expr_type_evaluator::evaluate_type; use crate::plan::expr_type_evaluator::TypeEvaluator;
use crate::plan::field::{field_by_name, field_name}; use crate::plan::field::{field_by_name, field_name};
use crate::plan::field_mapper::{field_and_dimensions, FieldTypeMap, TagSet}; use crate::plan::field_mapper::{field_and_dimensions, FieldTypeMap, TagSet};
use crate::plan::ir::{Select, TableReference}; use crate::plan::ir::{DataSource, Select};
use crate::plan::{error, util, SchemaProvider}; use crate::plan::{error, util, SchemaProvider};
use datafusion::common::{DataFusionError, Result}; use datafusion::common::{DataFusionError, Result};
use influxdb_influxql_parser::common::{MeasurementName, QualifiedMeasurementName}; use influxdb_influxql_parser::common::{MeasurementName, QualifiedMeasurementName};
@ -16,40 +16,39 @@ use influxdb_influxql_parser::select::{
Dimension, Field, FromMeasurementClause, GroupByClause, MeasurementSelection, SelectStatement, Dimension, Field, FromMeasurementClause, GroupByClause, MeasurementSelection, SelectStatement,
}; };
use itertools::Itertools; use itertools::Itertools;
use std::borrow::Borrow;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::ops::{ControlFlow, Deref}; use std::ops::{ControlFlow, Deref};
/// Recursively rewrite the specified [`SelectStatement`] by performing a series of passes /// Recursively rewrite the specified [`SelectStatement`] by performing a series of passes
/// to validate and normalize the statement. /// to validate and normalize the statement.
pub(super) fn rewrite_statement( pub(super) fn rewrite_statement(s: &dyn SchemaProvider, q: &SelectStatement) -> Result<Select> {
s: &dyn SchemaProvider,
q: &SelectStatement,
) -> Result<SelectStatement> {
let mut stmt = map_select(s, q)?; let mut stmt = map_select(s, q)?;
from_drop_empty(s, &mut stmt); from_drop_empty(s, &mut stmt);
field_list_normalize_time(&mut stmt); field_list_normalize_time(&mut stmt);
field_list_rewrite_aliases(&mut stmt.fields)?; field_list_rewrite_aliases(&mut stmt.fields)?;
Ok(stmt.into()) Ok(stmt)
} }
/// Map a `SelectStatement` to a `Select`, which is an intermediate representation to be /// Map a `SelectStatement` to a `Select`, which is an intermediate representation to be
/// used by the InfluxQL planner. /// used by the InfluxQL planner. Mapping also expands any wildcards in the `FROM` and
/// projection clauses.
/// ///
/// # NOTE /// # NOTE
/// ///
/// The goal is that `Select` will eventually be used by the InfluxQL planner. /// The goal is that `Select` will eventually be used by the InfluxQL planner.
pub(super) fn map_select(s: &dyn SchemaProvider, stmt: &SelectStatement) -> Result<Select> { pub(super) fn map_select(s: &dyn SchemaProvider, stmt: &SelectStatement) -> Result<Select> {
check_features(stmt)?;
let mut sel = Select { let mut sel = Select {
fields: vec![], fields: vec![],
from: vec![], from: vec![],
condition: stmt.condition.as_ref().map(|v| (**v).clone()), condition: stmt.condition.clone(),
group_by: stmt.group_by.clone(), group_by: stmt.group_by.clone(),
fill: stmt.fill, fill: stmt.fill,
order_by: stmt.order_by, order_by: stmt.order_by,
limit: stmt.limit.map(|v| *v), limit: stmt.limit,
offset: stmt.offset.map(|v| *v), offset: stmt.offset,
timezone: stmt.timezone.map(|v| *v), timezone: stmt.timezone.map(|v| *v),
}; };
from_expand_wildcards(s, stmt, &mut sel)?; from_expand_wildcards(s, stmt, &mut sel)?;
@ -58,6 +57,24 @@ pub(super) fn map_select(s: &dyn SchemaProvider, stmt: &SelectStatement) -> Resu
Ok(sel) Ok(sel)
} }
/// Asserts that the `SELECT` statement does not use any unimplemented features.
///
/// The list of unimplemented or unsupported features are listed below.
///
/// # `SLIMIT` and `SOFFSET`
///
/// * `SLIMIT` and `SOFFSET` don't work as expected per issue [#7571]
/// * This issue [is noted](https://docs.influxdata.com/influxdb/v1.8/query_language/explore-data/#the-slimit-clause) in our official documentation
///
/// [#7571]: https://github.com/influxdata/influxdb/issues/7571
fn check_features(stmt: &SelectStatement) -> Result<()> {
if stmt.series_limit.is_some() || stmt.series_offset.is_some() {
return error::not_implemented("SLIMIT or SOFFSET");
}
Ok(())
}
/// Ensure the time field is added to all projections, /// Ensure the time field is added to all projections,
/// and is moved to the first position, which is a requirement /// and is moved to the first position, which is a requirement
/// for InfluxQL compatibility. /// for InfluxQL compatibility.
@ -99,7 +116,7 @@ fn field_list_normalize_time(stmt: &mut Select) {
normalize_time(stmt, false); normalize_time(stmt, false);
for stmt in stmt.from.iter_mut().filter_map(|ms| match ms { for stmt in stmt.from.iter_mut().filter_map(|ms| match ms {
TableReference::Subquery(stmt) => Some(stmt), DataSource::Subquery(stmt) => Some(stmt),
_ => None, _ => None,
}) { }) {
normalize_time(stmt, true) normalize_time(stmt, true)
@ -121,7 +138,7 @@ fn from_expand_wildcards(
.. ..
} => { } => {
if s.table_exists(name) { if s.table_exists(name) {
new_from.push(TableReference::Name(name.deref().to_owned())) new_from.push(DataSource::Table(name.deref().to_owned()))
} }
} }
QualifiedMeasurementName { QualifiedMeasurementName {
@ -132,11 +149,11 @@ fn from_expand_wildcards(
s.table_names() s.table_names()
.into_iter() .into_iter()
.filter(|table| re.is_match(table)) .filter(|table| re.is_match(table))
.for_each(|table| new_from.push(TableReference::Name(table.to_owned()))); .for_each(|table| new_from.push(DataSource::Table(table.to_owned())));
} }
}, },
MeasurementSelection::Subquery(q) => { MeasurementSelection::Subquery(q) => {
new_from.push(TableReference::Subquery(Box::new(map_select(s, q)?))) new_from.push(DataSource::Subquery(Box::new(map_select(s, q)?)))
} }
} }
} }
@ -150,7 +167,7 @@ fn from_drop_empty(s: &dyn SchemaProvider, stmt: &mut Select) {
use schema::InfluxColumnType; use schema::InfluxColumnType;
stmt.from.retain_mut(|tr| { stmt.from.retain_mut(|tr| {
match tr { match tr {
TableReference::Name(name) => { DataSource::Table(name) => {
// drop any measurements that have no matching fields in the // drop any measurements that have no matching fields in the
// projection // projection
@ -168,7 +185,7 @@ fn from_drop_empty(s: &dyn SchemaProvider, stmt: &mut Select) {
false false
} }
} }
TableReference::Subquery(q) => { DataSource::Subquery(q) => {
from_drop_empty(s, q); from_drop_empty(s, q);
if q.from.is_empty() { if q.from.is_empty() {
return false; return false;
@ -191,14 +208,14 @@ fn from_drop_empty(s: &dyn SchemaProvider, stmt: &mut Select) {
/// Determine the merged fields and tags of the `FROM` clause. /// Determine the merged fields and tags of the `FROM` clause.
fn from_field_and_dimensions( fn from_field_and_dimensions(
s: &dyn SchemaProvider, s: &dyn SchemaProvider,
from: &[TableReference], from: &[DataSource],
) -> Result<(FieldTypeMap, TagSet)> { ) -> Result<(FieldTypeMap, TagSet)> {
let mut fs = FieldTypeMap::new(); let mut fs = FieldTypeMap::new();
let mut ts = TagSet::new(); let mut ts = TagSet::new();
for tr in from { for tr in from {
match tr { match tr {
TableReference::Name(name) => { DataSource::Table(name) => {
let (field_set, tag_set) = match field_and_dimensions(s, name.as_str())? { let (field_set, tag_set) = match field_and_dimensions(s, name.as_str())? {
Some(res) => res, Some(res) => res,
None => continue, None => continue,
@ -220,11 +237,11 @@ fn from_field_and_dimensions(
ts.extend(tag_set); ts.extend(tag_set);
} }
TableReference::Subquery(select) => { DataSource::Subquery(select) => {
let tv = TypeEvaluator::new(s, &select.from);
for f in &select.fields { for f in &select.fields {
let dt = match evaluate_type(s, &f.expr, &select.from)? { let Some(dt) = tv.eval_type(&f.expr)? else {
Some(dt) => dt, continue
None => continue,
}; };
let name = field_name(f); let name = field_name(f);
@ -332,16 +349,14 @@ fn field_list_expand_wildcards(
// Attempt to rewrite all variable references in the fields with their types, if one // Attempt to rewrite all variable references in the fields with their types, if one
// hasn't been specified. // hasn't been specified.
if let ControlFlow::Break(e) = sel.fields.iter_mut().try_for_each(|f| { if let ControlFlow::Break(e) = sel.fields.iter_mut().try_for_each(|f| {
walk_expr_mut::<DataFusionError>(&mut f.expr, &mut |e| { let tv = TypeEvaluator::new(s, &sel.from);
if matches!(e, Expr::VarRef(_)) {
let new_type = match evaluate_type(s, e.borrow(), &sel.from) {
Err(e) => ControlFlow::Break(e)?,
Ok(v) => v,
};
if let Expr::VarRef(v) = e { walk_expr_mut::<DataFusionError>(&mut f.expr, &mut |e| {
v.data_type = new_type; if let Expr::VarRef(ref mut v) = e {
} v.data_type = match tv.eval_var_ref(v) {
Ok(v) => v,
Err(e) => ControlFlow::Break(e)?,
};
} }
ControlFlow::Continue(()) ControlFlow::Continue(())
}) })
@ -690,7 +705,7 @@ struct FieldChecker {
} }
impl FieldChecker { impl FieldChecker {
fn check_fields(&mut self, q: &SelectStatement) -> Result<ProjectionType> { fn check_fields(&mut self, q: &Select) -> Result<ProjectionType> {
q.fields.iter().try_for_each(|f| self.check_expr(&f.expr))?; q.fields.iter().try_for_each(|f| self.check_expr(&f.expr))?;
match self.function_count() { match self.function_count() {
@ -1300,7 +1315,7 @@ pub(crate) struct SelectStatementInfo {
/// ///
/// * Are not combined with other aggregate, selector or window-like functions and may /// * Are not combined with other aggregate, selector or window-like functions and may
/// only project additional fields /// only project additional fields
pub(crate) fn select_statement_info(q: &SelectStatement) -> Result<SelectStatementInfo> { pub(super) fn select_statement_info(q: &Select) -> Result<SelectStatementInfo> {
let has_group_by_time = q let has_group_by_time = q
.group_by .group_by
.as_ref() .as_ref()
@ -1319,8 +1334,9 @@ pub(crate) fn select_statement_info(q: &SelectStatement) -> Result<SelectStateme
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crate::plan::ir::Select;
use crate::plan::rewriter::{ use crate::plan::rewriter::{
has_wildcards, rewrite_statement, select_statement_info, ProjectionType, has_wildcards, map_select, rewrite_statement, select_statement_info, ProjectionType,
}; };
use crate::plan::test_utils::{parse_select, MockSchemaProvider}; use crate::plan::test_utils::{parse_select, MockSchemaProvider};
use assert_matches::assert_matches; use assert_matches::assert_matches;
@ -1329,6 +1345,12 @@ mod test {
#[test] #[test]
fn test_select_statement_info() { fn test_select_statement_info() {
let namespace = MockSchemaProvider::default();
let parse_select = |s: &str| -> Select {
let select = parse_select(s);
map_select(&namespace, &select).unwrap()
};
let info = select_statement_info(&parse_select("SELECT foo, bar FROM cpu")).unwrap(); let info = select_statement_info(&parse_select("SELECT foo, bar FROM cpu")).unwrap();
assert_matches!(info.projection_type, ProjectionType::Raw); assert_matches!(info.projection_type, ProjectionType::Raw);
@ -1368,6 +1390,12 @@ mod test {
/// by `select_statement_info`. /// by `select_statement_info`.
#[test] #[test]
fn test_select_statement_info_functions() { fn test_select_statement_info_functions() {
let namespace = MockSchemaProvider::default();
let parse_select = |s: &str| -> Select {
let select = parse_select(s);
map_select(&namespace, &select).unwrap()
};
// percentile // percentile
let sel = parse_select("SELECT percentile(foo, 2) FROM cpu"); let sel = parse_select("SELECT percentile(foo, 2) FROM cpu");
select_statement_info(&sel).unwrap(); select_statement_info(&sel).unwrap();
@ -1582,16 +1610,10 @@ mod test {
select_statement_info(&sel).unwrap(); select_statement_info(&sel).unwrap();
let sel = parse_select("SELECT count(distinct('foo')) FROM cpu"); let sel = parse_select("SELECT count(distinct('foo')) FROM cpu");
assert_error!(select_statement_info(&sel), DataFusionError::Plan(ref s) if s == "expected field argument in distinct()"); assert_error!(select_statement_info(&sel), DataFusionError::Plan(ref s) if s == "expected field argument in distinct()");
let sel = parse_select("SELECT count(distinct foo) FROM cpu");
assert_error!(select_statement_info(&sel), DataFusionError::External(ref s) if s.to_string() == "InfluxQL internal error: unexpected distinct clause in count");
// Test rules for math functions // Test rules for math functions
let sel = parse_select("SELECT abs(usage_idle) FROM cpu"); let sel = parse_select("SELECT abs(usage_idle) FROM cpu");
select_statement_info(&sel).unwrap(); select_statement_info(&sel).unwrap();
let sel = parse_select("SELECT abs(*) + ceil(foo) FROM cpu");
assert_error!(select_statement_info(&sel), DataFusionError::External(ref s) if s.to_string() == "InfluxQL internal error: unexpected wildcard");
let sel = parse_select("SELECT abs(/f/) + ceil(foo) FROM cpu");
assert_error!(select_statement_info(&sel), DataFusionError::External(ref s) if s.to_string() == "InfluxQL internal error: unexpected regex");
// Fallible // Fallible
@ -1611,350 +1633,423 @@ mod test {
let sel = parse_select("SELECT foo, 1 FROM cpu"); let sel = parse_select("SELECT foo, 1 FROM cpu");
assert_error!(select_statement_info(&sel), DataFusionError::Plan(ref s) if s == "field must contain at least one variable"); assert_error!(select_statement_info(&sel), DataFusionError::Plan(ref s) if s == "field must contain at least one variable");
// wildcard expansion is not supported in binary expressions for aggregates
let sel = parse_select("SELECT count(*) + count(foo) FROM cpu");
assert_error!(select_statement_info(&sel), DataFusionError::External(ref s) if s.to_string() == "InfluxQL internal error: unexpected wildcard or regex");
// regex expansion is not supported in binary expressions
let sel = parse_select("SELECT sum(/foo/) + count(foo) FROM cpu");
assert_error!(select_statement_info(&sel), DataFusionError::External(ref s) if s.to_string() == "InfluxQL internal error: unexpected wildcard or regex");
// aggregate functions require a field reference // aggregate functions require a field reference
let sel = parse_select("SELECT sum(1) FROM cpu"); let sel = parse_select("SELECT sum(1) FROM cpu");
assert_error!(select_statement_info(&sel), DataFusionError::Plan(ref s) if s == "expected field argument in sum(), got Literal(Integer(1))"); assert_error!(select_statement_info(&sel), DataFusionError::Plan(ref s) if s == "expected field argument in sum(), got Literal(Integer(1))");
} }
#[test] mod rewrite_statement {
fn test_rewrite_statement() { use super::*;
let namespace = MockSchemaProvider::default(); use datafusion::common::Result;
// Exact, match use influxdb_influxql_parser::select::SelectStatement;
let stmt = parse_select("SELECT usage_user FROM cpu");
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT time::timestamp AS time, usage_user::float AS usage_user FROM cpu"
);
// Duplicate columns do not have conflicting aliases /// Test implementation that converts `Select` to `SelectStatement` so that it can be
let stmt = parse_select("SELECT usage_user, usage_user FROM cpu"); /// converted back to a string.
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); fn rewrite_statement(
assert_eq!( s: &MockSchemaProvider,
stmt.to_string(), q: &SelectStatement,
"SELECT time::timestamp AS time, usage_user::float AS usage_user, usage_user::float AS usage_user_1 FROM cpu" ) -> Result<SelectStatement> {
); let stmt = super::rewrite_statement(s, q)?;
Ok(stmt.into())
}
// Multiple aliases with no conflicts /// Validating types for simple projections
let stmt = parse_select("SELECT usage_user as usage_user_1, usage_user FROM cpu"); #[test]
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); fn projection_simple() {
assert_eq!( let namespace = MockSchemaProvider::default();
stmt.to_string(),
"SELECT time::timestamp AS time, usage_user::float AS usage_user_1, usage_user::float AS usage_user FROM cpu"
);
// Multiple aliases with conflicts // Exact, match
let stmt = let stmt = parse_select("SELECT usage_user FROM cpu");
parse_select("SELECT usage_user as usage_user_1, usage_user, usage_user, usage_user as usage_user_2, usage_user, usage_user_2 FROM cpu"); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!(
assert_eq!(stmt.to_string(), "SELECT time::timestamp AS time, usage_user::float AS usage_user_1, usage_user::float AS usage_user, usage_user::float AS usage_user_3, usage_user::float AS usage_user_2, usage_user::float AS usage_user_4, usage_user_2 AS usage_user_2_1 FROM cpu"); stmt.to_string(),
"SELECT time::timestamp AS time, usage_user::float AS usage_user FROM cpu"
);
// Only include measurements with at least one field projection // Duplicate columns do not have conflicting aliases
let stmt = parse_select("SELECT usage_idle FROM cpu, disk"); let stmt = parse_select("SELECT usage_user, usage_user FROM cpu");
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!( assert_eq!(
stmt.to_string(), stmt.to_string(),
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle FROM cpu" "SELECT time::timestamp AS time, usage_user::float AS usage_user, usage_user::float AS usage_user_1 FROM cpu"
); );
// Rewriting FROM clause // Multiple aliases with no conflicts
let stmt = parse_select("SELECT usage_user as usage_user_1, usage_user FROM cpu");
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT time::timestamp AS time, usage_user::float AS usage_user_1, usage_user::float AS usage_user FROM cpu"
);
// Regex, match, fields from multiple measurements // Multiple aliases with conflicts
let stmt = parse_select("SELECT bytes_free, bytes_read FROM /d/"); let stmt =
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); parse_select("SELECT usage_user as usage_user_1, usage_user, usage_user, usage_user as usage_user_2, usage_user, usage_user_2 FROM cpu");
assert_eq!( let stmt = rewrite_statement(&namespace, &stmt).unwrap();
stmt.to_string(), assert_eq!(stmt.to_string(), "SELECT time::timestamp AS time, usage_user::float AS usage_user_1, usage_user::float AS usage_user, usage_user::float AS usage_user_3, usage_user::float AS usage_user_2, usage_user::float AS usage_user_4, usage_user_2 AS usage_user_2_1 FROM cpu");
"SELECT time::timestamp AS time, bytes_free::integer AS bytes_free, bytes_read::integer AS bytes_read FROM disk, diskio"
);
// Regex matches multiple measurement, but only one has a matching field // Only include measurements with at least one field projection
let stmt = parse_select("SELECT bytes_free FROM /d/"); let stmt = parse_select("SELECT usage_idle FROM cpu, disk");
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!( assert_eq!(
stmt.to_string(), stmt.to_string(),
"SELECT time::timestamp AS time, bytes_free::integer AS bytes_free FROM disk" "SELECT time::timestamp AS time, usage_idle::float AS usage_idle FROM cpu"
); );
// Exact, no match // Field does not exist in single measurement
let stmt = parse_select("SELECT usage_idle FROM foo"); let stmt = parse_select("SELECT usage_idle, bytes_free FROM cpu");
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert!(stmt.from.is_empty()); assert_eq!(
stmt.to_string(),
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle, bytes_free AS bytes_free FROM cpu"
);
// Regex, no match // Field exists in each measurement
let stmt = parse_select("SELECT bytes_free FROM /^d$/"); let stmt = parse_select("SELECT usage_idle, bytes_free FROM cpu, disk");
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert!(stmt.from.is_empty()); assert_eq!(
stmt.to_string(),
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle, bytes_free::integer AS bytes_free FROM cpu, disk"
);
}
// Rewriting projection list /// Validate the expansion of the `FROM` clause using regular expressions
#[test]
fn from_expand_wildcards() {
let namespace = MockSchemaProvider::default();
// Single wildcard, single measurement // Regex, match, fields from multiple measurements
let stmt = parse_select("SELECT * FROM cpu"); let stmt = parse_select("SELECT bytes_free, bytes_read FROM /d/");
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!( assert_eq!(
stmt.to_string(), stmt.to_string(),
"SELECT time::timestamp AS time, cpu::tag AS cpu, host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu" "SELECT time::timestamp AS time, bytes_free::integer AS bytes_free, bytes_read::integer AS bytes_read FROM disk, diskio"
); );
let stmt = parse_select("SELECT * FROM cpu, disk"); // Regex matches multiple measurement, but only one has a matching field
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = parse_select("SELECT bytes_free FROM /d/");
assert_eq!( let stmt = rewrite_statement(&namespace, &stmt).unwrap();
stmt.to_string(), assert_eq!(
"SELECT time::timestamp AS time, bytes_free::integer AS bytes_free, bytes_used::integer AS bytes_used, cpu::tag AS cpu, device::tag AS device, host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu, disk" stmt.to_string(),
); "SELECT time::timestamp AS time, bytes_free::integer AS bytes_free FROM disk"
);
// Regular expression selects fields from multiple measurements // Exact, no match
let stmt = parse_select("SELECT /usage|bytes/ FROM cpu, disk"); let stmt = parse_select("SELECT usage_idle FROM foo");
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!( assert!(stmt.from.is_empty());
stmt.to_string(),
"SELECT time::timestamp AS time, bytes_free::integer AS bytes_free, bytes_used::integer AS bytes_used, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu, disk"
);
// Selective wildcard for tags // Regex, no match
let stmt = parse_select("SELECT *::tag, usage_idle FROM cpu"); let stmt = parse_select("SELECT bytes_free FROM /^d$/");
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!( assert!(stmt.from.is_empty());
stmt.to_string(), }
"SELECT time::timestamp AS time, cpu::tag AS cpu, host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle FROM cpu"
);
// Selective wildcard for tags only should not select any measurements /// Expanding the projection using wildcards
let stmt = parse_select("SELECT *::tag FROM cpu"); #[test]
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); fn projection_expand_wildcards() {
assert!(stmt.from.is_empty()); let namespace = MockSchemaProvider::default();
// Selective wildcard for fields // Single wildcard, single measurement
let stmt = parse_select("SELECT *::field FROM cpu"); let stmt = parse_select("SELECT * FROM cpu");
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!( assert_eq!(
stmt.to_string(), stmt.to_string(),
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu" "SELECT time::timestamp AS time, cpu::tag AS cpu, host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu"
); );
// Mixed fields and wildcards let stmt = parse_select("SELECT * FROM cpu, disk");
let stmt = parse_select("SELECT usage_idle, *::tag FROM cpu"); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!(
assert_eq!( stmt.to_string(),
stmt.to_string(), "SELECT time::timestamp AS time, bytes_free::integer AS bytes_free, bytes_used::integer AS bytes_used, cpu::tag AS cpu, device::tag AS device, host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu, disk"
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle, cpu::tag AS cpu, host::tag AS host, region::tag AS region FROM cpu" );
);
// GROUP BY expansion // Regular expression selects fields from multiple measurements
let stmt = parse_select("SELECT /usage|bytes/ FROM cpu, disk");
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT time::timestamp AS time, bytes_free::integer AS bytes_free, bytes_used::integer AS bytes_used, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu, disk"
);
let stmt = parse_select("SELECT usage_idle FROM cpu GROUP BY host"); // Selective wildcard for tags
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = parse_select("SELECT *::tag, usage_idle FROM cpu");
assert_eq!( let stmt = rewrite_statement(&namespace, &stmt).unwrap();
stmt.to_string(), assert_eq!(
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle FROM cpu GROUP BY host" stmt.to_string(),
); "SELECT time::timestamp AS time, cpu::tag AS cpu, host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle FROM cpu"
);
let stmt = parse_select("SELECT usage_idle FROM cpu GROUP BY *"); // Selective wildcard for tags only should not select any measurements
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = parse_select("SELECT *::tag FROM cpu");
assert_eq!( let stmt = rewrite_statement(&namespace, &stmt).unwrap();
stmt.to_string(), assert!(stmt.from.is_empty());
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle FROM cpu GROUP BY cpu, host, region"
);
// Does not include tags in projection when expanded in GROUP BY // Selective wildcard for fields
let stmt = parse_select("SELECT * FROM cpu GROUP BY *"); let stmt = parse_select("SELECT *::field FROM cpu");
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!( assert_eq!(
stmt.to_string(), stmt.to_string(),
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu GROUP BY cpu, host, region" "SELECT time::timestamp AS time, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu"
); );
// Does include explicitly listed tags in projection // Mixed fields and wildcards
let stmt = parse_select("SELECT host, * FROM cpu GROUP BY *"); let stmt = parse_select("SELECT usage_idle, *::tag FROM cpu");
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!( assert_eq!(
stmt.to_string(), stmt.to_string(),
"SELECT time::timestamp AS time, host::tag AS host, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu GROUP BY cpu, host, region" "SELECT time::timestamp AS time, usage_idle::float AS usage_idle, cpu::tag AS cpu, host::tag AS host, region::tag AS region FROM cpu"
); );
// Fallible let stmt = parse_select("SELECT * FROM merge_00, merge_01");
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT time::timestamp AS time, col0::float AS col0, col0::tag AS col0_1, col1::float AS col1, col1::tag AS col1_1, col2::string AS col2, col3::string AS col3 FROM merge_00, merge_01"
);
// Invalid regex // This should only select merge_01, as col0 is a tag in merge_00
let stmt = parse_select("SELECT usage_idle FROM /(not/"); let stmt = parse_select("SELECT /col0/ FROM merge_00, merge_01");
let err = rewrite_statement(&namespace, &stmt).unwrap_err(); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_contains!(err.to_string(), "invalid regular expression"); assert_eq!(
stmt.to_string(),
"SELECT time::timestamp AS time, col0::float AS col0, col0::tag AS col0_1 FROM merge_01"
);
}
// Subqueries #[test]
fn group_by() {
let namespace = MockSchemaProvider::default();
// Subquery, exact, match let stmt = parse_select("SELECT usage_idle FROM cpu GROUP BY host");
let stmt = parse_select("SELECT usage_idle FROM (SELECT usage_idle FROM cpu)"); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!(
assert_eq!( stmt.to_string(),
stmt.to_string(), "SELECT time::timestamp AS time, usage_idle::float AS usage_idle FROM cpu GROUP BY host"
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle FROM (SELECT time::timestamp AS time, usage_idle::float FROM cpu)" );
);
// Subquery, regex, match let stmt = parse_select("SELECT usage_idle FROM cpu GROUP BY *");
let stmt = parse_select("SELECT bytes_free FROM (SELECT bytes_free, bytes_read FROM /d/)"); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!(
assert_eq!( stmt.to_string(),
stmt.to_string(), "SELECT time::timestamp AS time, usage_idle::float AS usage_idle FROM cpu GROUP BY cpu, host, region"
"SELECT time::timestamp AS time, bytes_free::integer AS bytes_free FROM (SELECT time::timestamp AS time, bytes_free::integer, bytes_read::integer FROM disk, diskio)" );
);
// Subquery, exact, no match // Does not include tags in projection when expanded in GROUP BY
let stmt = parse_select("SELECT usage_idle FROM (SELECT usage_idle FROM foo)"); let stmt = parse_select("SELECT * FROM cpu GROUP BY *");
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert!(stmt.from.is_empty()); assert_eq!(
stmt.to_string(),
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu GROUP BY cpu, host, region"
);
// Subquery, regex, no match // Does include explicitly listed tags in projection
let stmt = parse_select("SELECT bytes_free FROM (SELECT bytes_free FROM /^d$/)"); let stmt = parse_select("SELECT host, * FROM cpu GROUP BY *");
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert!(stmt.from.is_empty()); assert_eq!(
stmt.to_string(),
"SELECT time::timestamp AS time, host::tag AS host, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu GROUP BY cpu, host, region"
);
}
// Correct data type is resolved from subquery /// Uncategorized fallible cases
let stmt = parse_select("SELECT *::field FROM (SELECT usage_system + usage_idle FROM cpu)"); #[test]
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); fn fallible() {
assert_eq!( let namespace = MockSchemaProvider::default();
stmt.to_string(),
"SELECT time::timestamp AS time, usage_system_usage_idle::float AS usage_system_usage_idle FROM (SELECT time::timestamp AS time, usage_system::float + usage_idle::float FROM cpu)"
);
// Subquery, no fields projected should be dropped // Invalid regex
let stmt = parse_select("SELECT usage_idle FROM cpu, (SELECT usage_system FROM cpu)"); let stmt = parse_select("SELECT usage_idle FROM /(not/");
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let err = rewrite_statement(&namespace, &stmt).unwrap_err();
assert_eq!( assert_contains!(err.to_string(), "invalid regular expression");
stmt.to_string(),
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle FROM cpu"
);
// Outer query are permitted to project tags only, as long as there are other fields let stmt = parse_select("SELECT *::field + *::tag FROM cpu");
// in the subquery let err = rewrite_statement(&namespace, &stmt).unwrap_err();
let stmt = parse_select("SELECT cpu FROM (SELECT cpu, usage_system FROM cpu)"); assert_eq!(
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); err.to_string(),
assert_eq!( "Error during planning: unsupported expression: contains a wildcard or regular expression"
stmt.to_string(), );
"SELECT time::timestamp AS time, cpu::tag AS cpu FROM (SELECT time::timestamp AS time, cpu::tag, usage_system::float FROM cpu)"
);
// Outer FROM should be empty, as the subquery does not project any fields let stmt = parse_select("SELECT COUNT(*::tag) FROM cpu");
let stmt = parse_select("SELECT cpu FROM (SELECT cpu FROM cpu)"); let err = rewrite_statement(&namespace, &stmt).unwrap_err();
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!(
assert!(stmt.from.is_empty()); err.to_string(),
"Error during planning: unable to use tag as wildcard in count()"
);
// Binary expression let stmt = parse_select("SELECT usage_idle FROM cpu SLIMIT 1");
let stmt = parse_select("SELECT bytes_free+bytes_used FROM disk"); let err = rewrite_statement(&namespace, &stmt).unwrap_err();
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!(
assert_eq!( err.to_string(),
stmt.to_string(), "This feature is not implemented: SLIMIT or SOFFSET"
"SELECT time::timestamp AS time, bytes_free::integer + bytes_used::integer AS bytes_free_bytes_used FROM disk" );
);
// Unary expressions let stmt = parse_select("SELECT usage_idle FROM cpu SOFFSET 1");
let stmt = parse_select("SELECT -bytes_free FROM disk"); let err = rewrite_statement(&namespace, &stmt).unwrap_err();
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!(
assert_eq!( err.to_string(),
stmt.to_string(), "This feature is not implemented: SLIMIT or SOFFSET"
"SELECT time::timestamp AS time, -1 * bytes_free::integer AS bytes_free FROM disk" );
); }
// DISTINCT clause /// Verify subqueries
#[test]
fn subqueries() {
let namespace = MockSchemaProvider::default();
// COUNT(DISTINCT) // Subquery, exact, match
let stmt = parse_select("SELECT COUNT(DISTINCT bytes_free) FROM disk"); let stmt = parse_select("SELECT usage_idle FROM (SELECT usage_idle FROM cpu)");
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!( assert_eq!(
stmt.to_string(), stmt.to_string(),
"SELECT time::timestamp AS time, count(distinct(bytes_free::integer)) AS count FROM disk" "SELECT time::timestamp AS time, usage_idle::float AS usage_idle FROM (SELECT time::timestamp AS time, usage_idle::float FROM cpu)"
); );
let stmt = parse_select("SELECT DISTINCT bytes_free FROM disk"); // Subquery, regex, match
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt =
assert_eq!( parse_select("SELECT bytes_free FROM (SELECT bytes_free, bytes_read FROM /d/)");
stmt.to_string(), let stmt = rewrite_statement(&namespace, &stmt).unwrap();
"SELECT time::timestamp AS time, distinct(bytes_free::integer) AS \"distinct\" FROM disk" assert_eq!(
); stmt.to_string(),
"SELECT time::timestamp AS time, bytes_free::integer AS bytes_free FROM (SELECT time::timestamp AS time, bytes_free::integer, bytes_read::integer FROM disk, diskio)"
);
// Call expressions // Subquery, exact, no match
let stmt = parse_select("SELECT usage_idle FROM (SELECT usage_idle FROM foo)");
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert!(stmt.from.is_empty());
let stmt = parse_select("SELECT COUNT(field_i64) FROM temp_01"); // Subquery, regex, no match
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = parse_select("SELECT bytes_free FROM (SELECT bytes_free FROM /^d$/)");
assert_eq!( let stmt = rewrite_statement(&namespace, &stmt).unwrap();
stmt.to_string(), assert!(stmt.from.is_empty());
"SELECT time::timestamp AS time, count(field_i64::integer) AS count FROM temp_01"
);
// Duplicate aggregate columns // Correct data type is resolved from subquery
let stmt = parse_select("SELECT COUNT(field_i64), COUNT(field_i64) FROM temp_01"); let stmt =
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); parse_select("SELECT *::field FROM (SELECT usage_system + usage_idle FROM cpu)");
assert_eq!( let stmt = rewrite_statement(&namespace, &stmt).unwrap();
stmt.to_string(), assert_eq!(
"SELECT time::timestamp AS time, count(field_i64::integer) AS count, count(field_i64::integer) AS count_1 FROM temp_01" stmt.to_string(),
); "SELECT time::timestamp AS time, usage_system_usage_idle::float AS usage_system_usage_idle FROM (SELECT time::timestamp AS time, usage_system::float + usage_idle::float FROM cpu)"
);
let stmt = parse_select("SELECT COUNT(field_f64) FROM temp_01"); // Subquery, no fields projected should be dropped
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = parse_select("SELECT usage_idle FROM cpu, (SELECT usage_system FROM cpu)");
assert_eq!( let stmt = rewrite_statement(&namespace, &stmt).unwrap();
stmt.to_string(), assert_eq!(
"SELECT time::timestamp AS time, count(field_f64::float) AS count FROM temp_01" stmt.to_string(),
); "SELECT time::timestamp AS time, usage_idle::float AS usage_idle FROM cpu"
);
// Expands all fields // Outer query are permitted to project tags only, as long as there are other fields
let stmt = parse_select("SELECT COUNT(*) FROM temp_01"); // in the subquery
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = parse_select("SELECT cpu FROM (SELECT cpu, usage_system FROM cpu)");
assert_eq!( let stmt = rewrite_statement(&namespace, &stmt).unwrap();
stmt.to_string(), assert_eq!(
"SELECT time::timestamp AS time, count(field_f64::float) AS count_field_f64, count(field_i64::integer) AS count_field_i64, count(field_str::string) AS count_field_str, count(field_u64::unsigned) AS count_field_u64, count(shared_field0::float) AS count_shared_field0 FROM temp_01" stmt.to_string(),
); "SELECT time::timestamp AS time, cpu::tag AS cpu FROM (SELECT time::timestamp AS time, cpu::tag, usage_system::float FROM cpu)"
);
// Expands matching fields // Outer FROM should be empty, as the subquery does not project any fields
let stmt = parse_select("SELECT COUNT(/64$/) FROM temp_01"); let stmt = parse_select("SELECT cpu FROM (SELECT cpu FROM cpu)");
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!( assert!(stmt.from.is_empty());
stmt.to_string(), }
"SELECT time::timestamp AS time, count(field_f64::float) AS count_field_f64, count(field_i64::integer) AS count_field_i64, count(field_u64::unsigned) AS count_field_u64 FROM temp_01"
);
// Expands only numeric fields /// `DISTINCT` clause and `distinct` function
let stmt = parse_select("SELECT SUM(*) FROM temp_01"); #[test]
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); fn projection_distinct() {
assert_eq!( let namespace = MockSchemaProvider::default();
stmt.to_string(),
"SELECT time::timestamp AS time, sum(field_f64::float) AS sum_field_f64, sum(field_i64::integer) AS sum_field_i64, sum(field_u64::unsigned) AS sum_field_u64, sum(shared_field0::float) AS sum_shared_field0 FROM temp_01"
);
let stmt = parse_select("SELECT * FROM merge_00, merge_01"); // COUNT(DISTINCT)
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); let stmt = parse_select("SELECT COUNT(DISTINCT bytes_free) FROM disk");
assert_eq!( let stmt = rewrite_statement(&namespace, &stmt).unwrap();
stmt.to_string(), assert_eq!(
"SELECT time::timestamp AS time, col0::float AS col0, col0::tag AS col0_1, col1::float AS col1, col1::tag AS col1_1, col2::string AS col2, col3::string AS col3 FROM merge_00, merge_01" stmt.to_string(),
); "SELECT time::timestamp AS time, count(distinct(bytes_free::integer)) AS count FROM disk"
);
// This should only select merge_01, as col0 is a tag in merge_00 let stmt = parse_select("SELECT DISTINCT bytes_free FROM disk");
let stmt = parse_select("SELECT /col0/ FROM merge_00, merge_01"); let stmt = rewrite_statement(&namespace, &stmt).unwrap();
let stmt = rewrite_statement(&namespace, &stmt).unwrap(); assert_eq!(
assert_eq!( stmt.to_string(),
stmt.to_string(), "SELECT time::timestamp AS time, distinct(bytes_free::integer) AS \"distinct\" FROM disk"
"SELECT time::timestamp AS time, col0::float AS col0, col0::tag AS col0_1 FROM merge_01" );
); }
// Fallible cases /// Projections with unary and binary expressions
#[test]
fn projection_unary_binary_expr() {
let namespace = MockSchemaProvider::default();
let stmt = parse_select("SELECT *::field + *::tag FROM cpu"); // Binary expression
let err = rewrite_statement(&namespace, &stmt).unwrap_err(); let stmt = parse_select("SELECT bytes_free+bytes_used FROM disk");
assert_eq!( let stmt = rewrite_statement(&namespace, &stmt).unwrap();
err.to_string(), assert_eq!(
"Error during planning: unsupported expression: contains a wildcard or regular expression" stmt.to_string(),
); "SELECT time::timestamp AS time, bytes_free::integer + bytes_used::integer AS bytes_free_bytes_used FROM disk"
);
let stmt = parse_select("SELECT COUNT(*::tag) FROM cpu"); // Unary expressions
let err = rewrite_statement(&namespace, &stmt).unwrap_err(); let stmt = parse_select("SELECT -bytes_free FROM disk");
assert_eq!( let stmt = rewrite_statement(&namespace, &stmt).unwrap();
err.to_string(), assert_eq!(
"Error during planning: unable to use tag as wildcard in count()" stmt.to_string(),
); "SELECT time::timestamp AS time, -1 * bytes_free::integer AS bytes_free FROM disk"
);
}
/// Projections which contain function calls
#[test]
fn projection_call_expr() {
let namespace = MockSchemaProvider::default();
let stmt = parse_select("SELECT COUNT(field_i64) FROM temp_01");
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT time::timestamp AS time, count(field_i64::integer) AS count FROM temp_01"
);
// Duplicate aggregate columns
let stmt = parse_select("SELECT COUNT(field_i64), COUNT(field_i64) FROM temp_01");
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT time::timestamp AS time, count(field_i64::integer) AS count, count(field_i64::integer) AS count_1 FROM temp_01"
);
let stmt = parse_select("SELECT COUNT(field_f64) FROM temp_01");
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT time::timestamp AS time, count(field_f64::float) AS count FROM temp_01"
);
// Expands all fields
let stmt = parse_select("SELECT COUNT(*) FROM temp_01");
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT time::timestamp AS time, count(field_f64::float) AS count_field_f64, count(field_i64::integer) AS count_field_i64, count(field_str::string) AS count_field_str, count(field_u64::unsigned) AS count_field_u64, count(shared_field0::float) AS count_shared_field0 FROM temp_01"
);
// Expands matching fields
let stmt = parse_select("SELECT COUNT(/64$/) FROM temp_01");
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT time::timestamp AS time, count(field_f64::float) AS count_field_f64, count(field_i64::integer) AS count_field_i64, count(field_u64::unsigned) AS count_field_u64 FROM temp_01"
);
// Expands only numeric fields
let stmt = parse_select("SELECT SUM(*) FROM temp_01");
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT time::timestamp AS time, sum(field_f64::float) AS sum_field_f64, sum(field_i64::integer) AS sum_field_i64, sum(field_u64::unsigned) AS sum_field_u64, sum(shared_field0::float) AS sum_shared_field0 FROM temp_01"
);
}
} }
#[test] #[test]