feat: complete InfluxQL subquery compatibility

Closes #7794
pull/24376/head
Stuart Carnie 2023-05-22 16:22:54 +10:00
parent ccd73a0b32
commit af76865b2c
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
4 changed files with 180 additions and 217 deletions

View File

@ -2,8 +2,6 @@
//! statement after it has been processed
use crate::plan::rewriter::ProjectionType;
use crate::plan::{error, SchemaProvider};
use datafusion::common::Result;
use influxdb_influxql_parser::common::{
LimitClause, MeasurementName, OffsetClause, OrderByClause, QualifiedMeasurementName,
WhereClause,
@ -13,14 +11,14 @@ use influxdb_influxql_parser::select::{
FieldList, FillClause, FromMeasurementClause, GroupByClause, MeasurementSelection,
SelectStatement, TimeZoneClause,
};
use schema::{InfluxColumnType, Schema};
use schema::InfluxColumnType;
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
/// A set of tag keys.
pub(super) type TagSet = HashSet<String>;
/// Represents a validated and normalized top-level [`SelectStatement]`.
/// Represents a validated and normalized top-level [`SelectStatement`].
#[derive(Debug, Default, Clone)]
pub(super) struct SelectQuery {
pub(super) select: Select,
@ -119,36 +117,6 @@ pub(super) enum DataSource {
Subquery(Box<Select>),
}
impl DataSource {
pub(super) fn schema(&self, s: &dyn SchemaProvider) -> Result<DataSourceSchema<'_>> {
match self {
Self::Table(table_name) => s
.table_schema(table_name)
.map(DataSourceSchema::Table)
.ok_or_else(|| error::map::internal("expected table")),
Self::Subquery(q) => Ok(DataSourceSchema::Subquery(q)),
}
}
}
pub(super) enum DataSourceSchema<'a> {
Table(Schema),
Subquery(&'a Select),
}
impl<'a> DataSourceSchema<'a> {
/// Returns `true` if the specified name is a tag field or a projection of a tag field if
/// the `DataSource` is a subquery.
pub(super) fn is_tag_field(&self, name: &str) -> bool {
match self {
DataSourceSchema::Table(s) => {
matches!(s.field_type_by_name(name), Some(InfluxColumnType::Tag))
}
DataSourceSchema::Subquery(q) => q.tag_set.contains(name),
}
}
}
#[derive(Debug, Clone)]
pub(super) struct Field {
pub(super) expr: Expr,

View File

@ -2,12 +2,12 @@ mod select;
use crate::plan::ir::{DataSource, Field, Select, SelectQuery};
use crate::plan::planner::select::{
fields_to_exprs_no_nulls, make_tag_key_column_meta, plan_with_sort,
fields_to_exprs_no_nulls, make_tag_key_column_meta, plan_with_sort, ProjectionInfo,
};
use crate::plan::planner_time_range_expression::{
duration_expr_to_nanoseconds, expr_to_df_interval_dt, time_range_to_df_expr,
};
use crate::plan::rewriter::{find_tables, rewrite_statement, ProjectionType};
use crate::plan::rewriter::{find_table_names, rewrite_statement, ProjectionType};
use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, Schemas};
use crate::plan::var_ref::var_ref_data_type_to_data_type;
use crate::plan::{error, planner_rewrite_expression};
@ -25,11 +25,11 @@ use datafusion::logical_expr::logical_plan::builder::project;
use datafusion::logical_expr::logical_plan::Analyze;
use datafusion::logical_expr::utils::find_aggregate_exprs;
use datafusion::logical_expr::{
binary_expr, col, date_bin, expr, expr::WindowFunction, lit, lit_timestamp_nano, now,
binary_expr, col, date_bin, expr, expr::WindowFunction, lit, lit_timestamp_nano, now, union,
window_function, Aggregate, AggregateFunction, AggregateUDF, Between, BuiltInWindowFunction,
BuiltinScalarFunction, EmptyRelation, Explain, Expr, ExprSchemable, Extension, GetIndexedField,
LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ScalarUDF, TableSource, ToStringifiedPlan,
WindowFrame, WindowFrameBound, WindowFrameUnits,
LogicalPlan, LogicalPlanBuilder, Operator, PlanType, Projection, ScalarUDF, TableSource,
ToStringifiedPlan, WindowFrame, WindowFrameBound, WindowFrameUnits,
};
use datafusion::prelude::{cast, sum, when, Column};
use datafusion_util::{lit_dict, AsExpr};
@ -39,7 +39,6 @@ use influxdb_influxql_parser::explain::{ExplainOption, ExplainStatement};
use influxdb_influxql_parser::expression::walk::{walk_expr, walk_expression, Expression};
use influxdb_influxql_parser::expression::{
Binary, Call, ConditionalBinary, ConditionalExpression, ConditionalOperator, VarRef,
VarRefDataType,
};
use influxdb_influxql_parser::functions::{
is_aggregate_function, is_now_function, is_scalar_math_function,
@ -181,7 +180,7 @@ impl<'a> Context<'a> {
}
}
/// Returns the combined GROUP BY tags clause from the root
/// Returns the combined `GROUP BY` tags clause from the root
/// and current statement. The list is sorted and guaranteed to be unique.
fn group_by_tags(&self) -> Vec<&str> {
match (self.root_group_by_tags.is_empty(), self.group_by) {
@ -315,18 +314,29 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
vec![]
};
let tables = find_tables(select);
let ProjectionInfo {
fields,
group_by_tag_set,
projection_tag_set,
is_projected,
} = ProjectionInfo::new(&select.fields, &group_by_tags);
let table_names = find_table_names(select);
let sort_by_measurement = table_names.len() > 1;
let mut plans = Vec::new();
for table_name in tables {
for table_name in table_names {
let ctx = Context::new(table_name)
.with_projection_type(select.projection_type)
.with_timezone(select.timezone)
.with_group_by_fill(select)
.with_root_group_by_tags(&group_by_tags);
if let Some(plan) = self.select_to_plan(&ctx, select)? {
plans.push((table_name, plan));
}
let Some(plan) = self.union_from(&ctx, select)? else {
continue;
};
let plan = self.project_select(&ctx, plan, &fields, &group_by_tag_set)?;
plans.push((table_name, plan));
}
let plan = {
@ -334,19 +344,26 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
table_name: &str,
input: LogicalPlan,
) -> Result<LogicalPlan> {
// Prepare new projection with measurement
let proj_exprs =
iter::once(lit_dict(table_name).alias(INFLUXQL_MEASUREMENT_COLUMN_NAME))
.chain(
input
.schema()
.fields()
.iter()
.map(|expr| Expr::Column(expr.unqualified_column())),
)
.collect::<Vec<_>>();
project(input, proj_exprs)
if let LogicalPlan::Projection(Projection { expr, input, .. }) = input {
// Rewrite the existing projection with the measurement name column first
project(
input.deref().clone(),
iter::once(lit_dict(table_name).alias(INFLUXQL_MEASUREMENT_COLUMN_NAME))
.chain(expr),
)
} else {
project(
input.clone(),
iter::once(lit_dict(table_name).alias(INFLUXQL_MEASUREMENT_COLUMN_NAME))
.chain(
input
.schema()
.fields()
.iter()
.map(|expr| Expr::Column(expr.unqualified_column())),
),
)
}
}
let mut iter = plans.into_iter();
@ -376,66 +393,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
iter.try_fold(plan, |prev, (table_name, input)| {
let next = project_with_measurement(table_name, input)?;
LogicalPlanBuilder::from(prev).union(next)?.build()
union(prev, next)
})?
};
// Skip the `time` column
let fields_no_time = &select.fields[1..];
// always start with the time column
let mut fields = vec![select.fields.first().cloned().unwrap()];
// group_by_tag_set : a list of tag columns specified in the GROUP BY clause
// projection_tag_set : a list of tag columns specified exclusively in the SELECT projection
// is_projected : a list of booleans indicating whether matching elements in the
// group_by_tag_set are also projected in the query
let (group_by_tag_set, projection_tag_set, is_projected) = if group_by_tags.is_empty() {
let tag_columns = find_tag_and_unknown_columns(fields_no_time)
.sorted()
.collect::<Vec<_>>();
(vec![], tag_columns, vec![])
} else {
let mut tag_columns =
find_tag_and_unknown_columns(fields_no_time).collect::<HashSet<_>>();
// Find the list of tag keys specified in the `GROUP BY` clause, and
// whether any of the tag keys are also projected in the SELECT list.
let (tag_set, is_projected): (Vec<_>, Vec<_>) = group_by_tags
.into_iter()
.map(|s| (s, tag_columns.contains(s)))
.unzip();
// Tags specified in the `GROUP BY` clause that are not already added to the
// projection must be projected, so they can be used in the group key.
//
// At the end of the loop, the `tag_columns` set will contain the tag columns that
// exist in the projection and not in the `GROUP BY`.
fields.extend(
tag_set
.iter()
.filter_map(|col| match tag_columns.remove(*col) {
true => None,
false => Some(Field {
expr: IQLExpr::VarRef(VarRef {
name: (*col).into(),
data_type: Some(VarRefDataType::Tag),
}),
name: col.to_string(),
data_type: Some(InfluxColumnType::Tag),
}),
}),
);
(
tag_set,
tag_columns.into_iter().sorted().collect::<Vec<_>>(),
is_projected,
)
};
fields.extend(fields_no_time.iter().cloned());
let plan = plan_with_metadata(
plan,
&InfluxQlMetadata {
@ -460,77 +421,38 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
false,
);
plan_with_sort(
let plan = plan_with_sort(
plan,
vec![time_sort_expr.clone()],
sort_by_measurement,
&group_by_tag_set,
&projection_tag_set,
)?;
self.limit(
plan,
select.offset,
select.limit,
vec![time_sort_expr],
true,
sort_by_measurement,
&group_by_tag_set,
&projection_tag_set,
)
}
fn select_to_plan(&self, ctx: &Context<'_>, select: &Select) -> Result<Option<LogicalPlan>> {
// Skip the `time` column
let fields_no_time = &select.fields[1..];
// always start with the time column
let mut fields = vec![select.fields.first().cloned().unwrap()];
// group_by_tag_set : a list of tag columns specified in the GROUP BY clause
// projection_tag_set : a list of tag columns specified exclusively in the SELECT projection
// is_projected : a list of booleans indicating whether matching elements in the
// group_by_tag_set are also projected in the query
let group_by_tags = ctx.group_by_tags();
let (group_by_tag_set, projection_tag_set, is_projected) = if group_by_tags.is_empty() {
let tag_columns = find_tag_and_unknown_columns(fields_no_time)
.sorted()
.collect::<Vec<_>>();
(vec![], tag_columns, vec![])
} else {
let mut tag_columns =
find_tag_and_unknown_columns(fields_no_time).collect::<HashSet<_>>();
// Find the list of tag keys specified in the `GROUP BY` clause, and
// whether any of the tag keys are also projected in the SELECT list.
let (tag_set, is_projected): (Vec<_>, Vec<_>) = group_by_tags
.into_iter()
.map(|s| (s, tag_columns.contains(s)))
.unzip();
// Tags specified in the `GROUP BY` clause that are not already added to the
// projection must be projected, so they can be used in the group key.
//
// At the end of the loop, the `tag_columns` set will contain the tag columns that
// exist in the projection and not in the `GROUP BY`.
fields.extend(
tag_set
.iter()
.filter_map(|col| match tag_columns.remove(*col) {
true => None,
false => Some(Field {
expr: IQLExpr::VarRef(VarRef {
name: (*col).into(),
data_type: Some(VarRefDataType::Tag),
}),
name: col.to_string(),
data_type: Some(InfluxColumnType::Tag),
}),
}),
);
(
tag_set,
tag_columns.into_iter().sorted().collect::<Vec<_>>(),
is_projected,
)
};
fields.extend(fields_no_time.iter().cloned());
fn subquery_to_plan(&self, ctx: &Context<'_>, select: &Select) -> Result<Option<LogicalPlan>> {
let Some(plan) = self.union_from(ctx, select)? else {
return Ok(None)
};
let group_by_tags = ctx.group_by_tags();
let ProjectionInfo {
fields,
group_by_tag_set,
projection_tag_set,
..
} = ProjectionInfo::new(&select.fields, &group_by_tags);
let plan = self.project_select(ctx, plan, &fields, &group_by_tag_set)?;
// the sort planner node must refer to the time column using
@ -554,7 +476,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
&projection_tag_set,
)?;
let plan = self.limit(
Ok(Some(self.limit(
plan,
select.offset,
select.limit,
@ -562,12 +484,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
false,
&group_by_tag_set,
&projection_tag_set,
)?;
Ok(Some(plan))
)?))
}
///
/// Returns a `LogicalPlan` that combines the `FROM` clause as a `UNION ALL`.
fn union_from(&self, ctx: &Context<'_>, select: &Select) -> Result<Option<LogicalPlan>> {
let mut plans = Vec::new();
for ds in &select.from {
@ -608,9 +528,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
let plan = iter
.next()
.ok_or_else(|| error::map::internal("expected plan"))?;
iter.try_fold(plan, |prev, next| {
LogicalPlanBuilder::from(prev).union(next)?.build()
})?
iter.try_fold(plan, union)?
};
Some(plan)
}
@ -848,8 +766,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
/// - `offset`: The number of input rows to skip.
/// - `limit`: The maximum number of rows to return in the output plan per group.
/// - `time_sort_expr`: An `Expr::Sort` referring to the `time` column of the input.
/// - `has_multiple_measurements`: `true` if the `input` produces multiple measurements,
/// and therefore the limit should be applied per measurement and any additional group tags.
/// - `sort_by_measurement`: `true` if the `input` must be sorted by the measurement column.
/// - `group_by_tag_set`: Tag columns from the `input` plan that should be used to partition
/// the `input` plan and sort the `output` plan.
/// - `projection_tag_set`: Additional tag columns that should be used to sort the `output`
@ -861,7 +778,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
offset: Option<OffsetClause>,
limit: Option<LimitClause>,
sort_exprs: Vec<Expr>,
has_multiple_measurements: bool,
sort_by_measurement: bool,
group_by_tag_set: &[&str],
projection_tag_set: &[&str],
) -> Result<LogicalPlan> {
@ -869,7 +786,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
return Ok(input);
}
if group_by_tag_set.is_empty() && !has_multiple_measurements {
if group_by_tag_set.is_empty() && !sort_by_measurement {
// If the query is not grouping by tags, and is a single measurement, the DataFusion
// Limit operator is sufficient.
let skip = offset.map_or(0, |v| *v as usize);
@ -893,7 +810,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
// ) AS iox::row
let order_by = sort_exprs.clone();
let partition_by = if has_multiple_measurements {
let partition_by = if sort_by_measurement {
iter::once(INFLUXQL_MEASUREMENT_COLUMN_NAME.as_expr())
.chain(fields_to_exprs_no_nulls(input.schema(), group_by_tag_set))
.collect::<Vec<_>>()
@ -974,7 +891,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
plan_with_sort(
plan,
sort_exprs,
has_multiple_measurements,
sort_by_measurement,
group_by_tag_set,
projection_tag_set,
)
@ -1354,8 +1271,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
}
}
/// Generate a list of logical plans for each of the tables references in the `FROM`
/// clause.
/// Generate a logical plan for the specified `DataSource`.
fn plan_from_data_source(
&self,
ctx: &Context<'_>,
@ -1378,7 +1294,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
.with_group_by_fill(select)
.with_root_group_by_tags(ctx.root_group_by_tags);
Ok(self.select_to_plan(&ctx, select)?)
Ok(self.subquery_to_plan(&ctx, select)?)
}
}
}
@ -2229,15 +2145,6 @@ fn is_aggregate_field(f: &Field) -> bool {
.is_break()
}
/// Find all the columns where the resolved data type
/// is a tag or is [`None`], which is unknown.
fn find_tag_and_unknown_columns(fields: &[Field]) -> impl Iterator<Item = &str> {
fields.iter().filter_map(|f| match f.data_type {
Some(InfluxColumnType::Tag) | None => Some(f.name.as_str()),
_ => None,
})
}
fn conditional_op_to_operator(op: ConditionalOperator) -> Result<Operator> {
match op {
ConditionalOperator::Eq => Ok(Operator::Eq),
@ -2472,7 +2379,7 @@ mod test {
#[test]
fn test_find_var_refs() {
use VarRefDataType::*;
use influxdb_influxql_parser::expression::VarRefDataType::*;
macro_rules! var_ref {
($NAME: literal) => {

View File

@ -5,8 +5,9 @@ use datafusion::logical_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion_util::AsExpr;
use generated_types::influxdata::iox::querier::v1::influx_ql_metadata::TagKeyColumn;
use influxdb_influxql_parser::expression::{Expr as IQLExpr, VarRef, VarRefDataType};
use schema::INFLUXQL_MEASUREMENT_COLUMN_NAME;
use std::collections::HashMap;
use itertools::Itertools;
use schema::{InfluxColumnType, INFLUXQL_MEASUREMENT_COLUMN_NAME};
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
pub(super) fn make_tag_key_column_meta(
@ -60,10 +61,6 @@ pub(super) fn plan_with_sort(
group_by_tag_set: &[&str],
projection_tag_set: &[&str],
) -> Result<LogicalPlan> {
// If there are multiple measurements, we need to sort by the measurement column
// NOTE: Ideally DataFusion would maintain the order of the UNION ALL, which would eliminate
// the need to sort by measurement.
// See: https://github.com/influxdata/influxdb_iox/issues/7062
let mut series_sort = if sort_by_measurement {
vec![Expr::sort(
INFLUXQL_MEASUREMENT_COLUMN_NAME.as_expr(),
@ -113,3 +110,92 @@ pub(super) fn fields_to_exprs_no_nulls<'a>(
})
.map(|f| f.as_expr())
}
/// Contains an expanded `SELECT` projection
pub(super) struct ProjectionInfo<'a> {
/// A copy of the `SELECT` fields that includes tags from the `GROUP BY` that were not
/// specified in the original `SELECT` projection.
pub(super) fields: Vec<Field>,
/// A list of tag column names specified in the `GROUP BY` clause.
pub(super) group_by_tag_set: Vec<&'a str>,
/// A list of tag column names specified exclusively in the `SELECT` projection.
pub(super) projection_tag_set: Vec<&'a str>,
/// A list of booleans indicating whether matching elements in the
/// `group_by_tag_set` are also projected in the query.
pub(super) is_projected: Vec<bool>,
}
impl<'a> ProjectionInfo<'a> {
/// Computes a `ProjectionInfo` from the specified `fields` and `group_by_tags`.
pub(super) fn new(fields: &'a [Field], group_by_tags: &'a [&'a str]) -> Self {
// Skip the `time` column
let fields_no_time = &fields[1..];
// always start with the time column
let mut fields = vec![fields.first().cloned().unwrap()];
let (group_by_tag_set, projection_tag_set, is_projected) = if group_by_tags.is_empty() {
let tag_columns = find_tag_and_unknown_columns(fields_no_time)
.sorted()
.collect::<Vec<_>>();
(vec![], tag_columns, vec![])
} else {
let mut tag_columns =
find_tag_and_unknown_columns(fields_no_time).collect::<HashSet<_>>();
// Find the list of tag keys specified in the `GROUP BY` clause, and
// whether any of the tag keys are also projected in the SELECT list.
let (tag_set, is_projected): (Vec<_>, Vec<_>) = group_by_tags
.iter()
.map(|s| (*s, tag_columns.contains(s)))
.unzip();
// Tags specified in the `GROUP BY` clause that are not already added to the
// projection must be projected, so they can be used in the group key.
//
// At the end of the loop, the `tag_columns` set will contain the tag columns that
// exist in the projection and not in the `GROUP BY`.
fields.extend(
tag_set
.iter()
.filter_map(|col| match tag_columns.remove(*col) {
true => None,
false => Some(Field {
expr: IQLExpr::VarRef(VarRef {
name: (*col).into(),
data_type: Some(VarRefDataType::Tag),
}),
name: col.to_string(),
data_type: Some(InfluxColumnType::Tag),
}),
}),
);
(
tag_set,
tag_columns.into_iter().sorted().collect::<Vec<_>>(),
is_projected,
)
};
fields.extend(fields_no_time.iter().cloned());
Self {
fields,
group_by_tag_set,
projection_tag_set,
is_projected,
}
}
}
/// Find all the columns where the resolved data type
/// is a tag or is [`None`], which is unknown.
fn find_tag_and_unknown_columns(fields: &[Field]) -> impl Iterator<Item = &str> {
fields.iter().filter_map(|f| match f.data_type {
Some(InfluxColumnType::Tag) | None => Some(f.name.as_str()),
_ => None,
})
}

View File

@ -38,7 +38,9 @@ pub(super) fn rewrite_statement(
Ok(SelectQuery { select })
}
pub(super) fn find_tables(s: &Select) -> Vec<&str> {
/// Find the unique list of tables used by `s`, recursively following all `FROM` clauses and
/// return the results in lexicographically in ascending order.
pub(super) fn find_table_names(s: &Select) -> Vec<&str> {
let mut data_sources = vec![s.from.as_slice()];
let mut tables = Vec::new();
while let Some(from) = data_sources.pop() {
@ -1512,7 +1514,7 @@ mod test {
use super::Result;
use crate::plan::ir::{Field, Select};
use crate::plan::rewriter::{
find_tables, has_wildcards, rewrite_select, rewrite_statement, ProjectionType,
find_table_names, has_wildcards, rewrite_select, rewrite_statement, ProjectionType,
SelectStatementInfo,
};
use crate::plan::test_utils::{parse_select, MockSchemaProvider};
@ -1522,7 +1524,7 @@ mod test {
use test_helpers::{assert_contains, assert_error};
#[test]
fn test_find_tables() {
fn test_find_table_names() {
let namespace = MockSchemaProvider::default();
let parse_select = |s: &str| -> Select {
let select = parse_select(s);
@ -1530,21 +1532,21 @@ mod test {
};
let s = parse_select("SELECT usage_idle FROM cpu");
assert_eq!(find_tables(&s), &["cpu"]);
assert_eq!(find_table_names(&s), &["cpu"]);
let s = parse_select("SELECT usage_idle FROM cpu, disk");
assert_eq!(find_tables(&s), &["cpu", "disk"]);
assert_eq!(find_table_names(&s), &["cpu", "disk"]);
let s = parse_select("SELECT usage_idle FROM disk, cpu, disk");
assert_eq!(find_tables(&s), &["cpu", "disk"]);
assert_eq!(find_table_names(&s), &["cpu", "disk"]);
// subqueries
let s = parse_select("SELECT usage_idle FROM (select * from cpu, disk)");
assert_eq!(find_tables(&s), &["cpu", "disk"]);
assert_eq!(find_table_names(&s), &["cpu", "disk"]);
let s = parse_select("SELECT usage_idle FROM cpu, (select * from cpu, disk)");
assert_eq!(find_tables(&s), &["cpu", "disk"]);
assert_eq!(find_table_names(&s), &["cpu", "disk"]);
}
#[test]