Merge pull request #7727 from influxdata/sgc/issue/subquery_6891_03
chore: Ensure time column is projected in subqueriespull/24376/head
commit
90186e1937
|
@ -194,6 +194,22 @@ impl Display for Expr {
|
|||
}
|
||||
}
|
||||
|
||||
/// Traits to help creating InfluxQL [`Expr`]s containing
|
||||
/// a [`VarRef`].
|
||||
pub trait AsVarRefExpr {
|
||||
/// Creates an InfluxQL [`VarRef`] expression.
|
||||
fn to_var_ref_expr(&self) -> Expr;
|
||||
}
|
||||
|
||||
impl AsVarRefExpr for str {
|
||||
fn to_var_ref_expr(&self) -> Expr {
|
||||
Expr::VarRef(VarRef {
|
||||
name: self.into(),
|
||||
data_type: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Specifies the data type of a wildcard (`*`) when using the `::` operator.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum WildcardType {
|
||||
|
|
|
@ -24,9 +24,15 @@ SELECT /64|tag0/ FROM m0;
|
|||
-- Projection specific tags and fields
|
||||
SELECT f64, tag0 FROM m0;
|
||||
|
||||
-- Explicitly select time column
|
||||
-- Explicitly select time column, should appear in first column
|
||||
SELECT f64, tag0, time FROM m0;
|
||||
|
||||
-- Alias time column
|
||||
SELECT f64, tag0, time as timestamp FROM m0;
|
||||
|
||||
-- Alias field and tag columns
|
||||
SELECT f64 as f, tag0 as t FROM m0;
|
||||
|
||||
-- arithmetic operators
|
||||
SELECT f64, f64 * 2, i64, i64 + i64 FROM m0;
|
||||
|
||||
|
|
|
@ -70,17 +70,43 @@ name: m0
|
|||
+---------------------+------+-------+
|
||||
-- InfluxQL: SELECT f64, tag0, time FROM m0;
|
||||
name: m0
|
||||
+------+-------+---------------------+
|
||||
| f64 | tag0 | time |
|
||||
+------+-------+---------------------+
|
||||
| 10.1 | val00 | 2022-10-31T02:00:00 |
|
||||
| 11.3 | val01 | 2022-10-31T02:00:00 |
|
||||
| 10.4 | val02 | 2022-10-31T02:00:00 |
|
||||
| 21.2 | val00 | 2022-10-31T02:00:10 |
|
||||
| 18.9 | val00 | 2022-10-31T02:00:10 |
|
||||
| 11.2 | val00 | 2022-10-31T02:00:20 |
|
||||
| 19.2 | val00 | 2022-10-31T02:00:30 |
|
||||
+------+-------+---------------------+
|
||||
+---------------------+------+-------+
|
||||
| time | f64 | tag0 |
|
||||
+---------------------+------+-------+
|
||||
| 2022-10-31T02:00:00 | 10.1 | val00 |
|
||||
| 2022-10-31T02:00:00 | 11.3 | val01 |
|
||||
| 2022-10-31T02:00:00 | 10.4 | val02 |
|
||||
| 2022-10-31T02:00:10 | 21.2 | val00 |
|
||||
| 2022-10-31T02:00:10 | 18.9 | val00 |
|
||||
| 2022-10-31T02:00:20 | 11.2 | val00 |
|
||||
| 2022-10-31T02:00:30 | 19.2 | val00 |
|
||||
+---------------------+------+-------+
|
||||
-- InfluxQL: SELECT f64, tag0, time as timestamp FROM m0;
|
||||
name: m0
|
||||
+---------------------+------+-------+
|
||||
| timestamp | f64 | tag0 |
|
||||
+---------------------+------+-------+
|
||||
| 2022-10-31T02:00:00 | 10.1 | val00 |
|
||||
| 2022-10-31T02:00:00 | 11.3 | val01 |
|
||||
| 2022-10-31T02:00:00 | 10.4 | val02 |
|
||||
| 2022-10-31T02:00:10 | 21.2 | val00 |
|
||||
| 2022-10-31T02:00:10 | 18.9 | val00 |
|
||||
| 2022-10-31T02:00:20 | 11.2 | val00 |
|
||||
| 2022-10-31T02:00:30 | 19.2 | val00 |
|
||||
+---------------------+------+-------+
|
||||
-- InfluxQL: SELECT f64 as f, tag0 as t FROM m0;
|
||||
name: m0
|
||||
+---------------------+------+-------+
|
||||
| time | f | t |
|
||||
+---------------------+------+-------+
|
||||
| 2022-10-31T02:00:00 | 10.1 | val00 |
|
||||
| 2022-10-31T02:00:00 | 11.3 | val01 |
|
||||
| 2022-10-31T02:00:00 | 10.4 | val02 |
|
||||
| 2022-10-31T02:00:10 | 21.2 | val00 |
|
||||
| 2022-10-31T02:00:10 | 18.9 | val00 |
|
||||
| 2022-10-31T02:00:20 | 11.2 | val00 |
|
||||
| 2022-10-31T02:00:30 | 19.2 | val00 |
|
||||
+---------------------+------+-------+
|
||||
-- InfluxQL: SELECT f64, f64 * 2, i64, i64 + i64 FROM m0;
|
||||
name: m0
|
||||
+---------------------+------+-------+-----+---------+
|
||||
|
|
|
@ -24,7 +24,11 @@ pub(crate) mod map {
|
|||
#[derive(Debug, Error)]
|
||||
enum PlannerError {
|
||||
/// An unexpected error that represents a bug in IOx.
|
||||
#[error("internal: {0}")]
|
||||
///
|
||||
/// The message is prefixed with `InfluxQL internal error: `,
|
||||
/// which may be used by clients to identify internal InfluxQL
|
||||
/// errors.
|
||||
#[error("InfluxQL internal error: {0}")]
|
||||
Internal(String),
|
||||
}
|
||||
|
||||
|
@ -42,4 +46,17 @@ pub(crate) mod map {
|
|||
pub(crate) fn not_implemented(feature: impl Into<String>) -> DataFusionError {
|
||||
DataFusionError::NotImplemented(feature.into())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::plan::error::map::PlannerError;
|
||||
|
||||
#[test]
|
||||
fn test_planner_error_display() {
|
||||
// The InfluxQL internal error:
|
||||
assert!(PlannerError::Internal("****".to_owned())
|
||||
.to_string()
|
||||
.starts_with("InfluxQL internal error: "))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,8 +1,7 @@
|
|||
mod select;
|
||||
|
||||
use crate::plan::planner::select::{
|
||||
check_exprs_satisfy_columns, fields_to_exprs_no_nulls, make_tag_key_column_meta,
|
||||
plan_with_sort, ToSortExpr,
|
||||
check_exprs_satisfy_columns, fields_to_exprs_no_nulls, make_tag_key_column_meta, plan_with_sort,
|
||||
};
|
||||
use crate::plan::planner_time_range_expression::{
|
||||
duration_expr_to_nanoseconds, expr_to_df_interval_dt, time_range_to_df_expr,
|
||||
|
@ -11,7 +10,7 @@ use crate::plan::rewriter::{
|
|||
rewrite_statement, select_statement_info, ProjectionType, SelectStatementInfo,
|
||||
};
|
||||
use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, Schemas};
|
||||
use crate::plan::var_ref::{column_type_to_var_ref_data_type, var_ref_data_type_to_data_type};
|
||||
use crate::plan::var_ref::{data_type_to_var_ref_data_type, var_ref_data_type_to_data_type};
|
||||
use crate::plan::{error, planner_rewrite_expression};
|
||||
use arrow::array::{StringBuilder, StringDictionaryBuilder};
|
||||
use arrow::datatypes::{DataType, Field as ArrowField, Int32Type, Schema as ArrowSchema};
|
||||
|
@ -34,7 +33,7 @@ use datafusion::logical_expr::{
|
|||
use datafusion::prelude::{cast, sum, when, Column};
|
||||
use datafusion_util::{lit_dict, AsExpr};
|
||||
use generated_types::influxdata::iox::querier::v1::InfluxQlMetadata;
|
||||
use influxdb_influxql_parser::common::{LimitClause, OffsetClause};
|
||||
use influxdb_influxql_parser::common::{LimitClause, OffsetClause, OrderByClause};
|
||||
use influxdb_influxql_parser::explain::{ExplainOption, ExplainStatement};
|
||||
use influxdb_influxql_parser::expression::walk::walk_expr;
|
||||
use influxdb_influxql_parser::expression::{
|
||||
|
@ -59,7 +58,7 @@ use influxdb_influxql_parser::{
|
|||
expression::Expr as IQLExpr,
|
||||
identifier::Identifier,
|
||||
literal::Literal,
|
||||
select::{Field, FieldList, FromMeasurementClause, MeasurementSelection, SelectStatement},
|
||||
select::{Field, FromMeasurementClause, MeasurementSelection, SelectStatement},
|
||||
statement::Statement,
|
||||
};
|
||||
use iox_query::config::{IoxConfigExt, MetadataCutoff};
|
||||
|
@ -284,18 +283,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
.with_timezone(select.timezone)
|
||||
.with_group_by_fill(select);
|
||||
|
||||
// The `time` column is always present in the result set
|
||||
let mut fields = if find_time_column_index(&select.fields).is_none() {
|
||||
vec![Field {
|
||||
expr: IQLExpr::VarRef(VarRef {
|
||||
name: "time".into(),
|
||||
data_type: Some(VarRefDataType::Timestamp),
|
||||
}),
|
||||
alias: Some("time".into()),
|
||||
}]
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
// Skip the `time` column
|
||||
let fields_no_time = &select.fields[1..];
|
||||
// always start with the time column
|
||||
let mut fields = vec![select.fields.first().cloned().unwrap()];
|
||||
|
||||
// group_by_tag_set : a list of tag columns specified in the GROUP BY clause
|
||||
// projection_tag_set : a list of tag columns specified exclusively in the SELECT projection
|
||||
|
@ -304,7 +295,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
let (group_by_tag_set, projection_tag_set, is_projected) =
|
||||
if let Some(group_by) = &select.group_by {
|
||||
let mut tag_columns =
|
||||
find_tag_and_unknown_columns(&select.fields).collect::<HashSet<_>>();
|
||||
find_tag_and_unknown_columns(fields_no_time).collect::<HashSet<_>>();
|
||||
|
||||
// Find the list of tag keys specified in the `GROUP BY` clause, and
|
||||
// whether any of the tag keys are also projected in the SELECT list.
|
||||
|
@ -344,13 +335,13 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
is_projected,
|
||||
)
|
||||
} else {
|
||||
let tag_columns = find_tag_and_unknown_columns(&select.fields)
|
||||
let tag_columns = find_tag_and_unknown_columns(fields_no_time)
|
||||
.sorted()
|
||||
.collect::<Vec<_>>();
|
||||
(vec![], tag_columns, vec![])
|
||||
};
|
||||
|
||||
fields.extend(select.fields.iter().cloned());
|
||||
fields.extend(fields_no_time.iter().cloned());
|
||||
|
||||
// Build the first non-empty plan
|
||||
let plan = {
|
||||
|
@ -427,9 +418,26 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
// The UNION operator indicates the result set produces multiple tables or measurements.
|
||||
let is_multiple_measurements = matches!(plan, LogicalPlan::Union(_));
|
||||
|
||||
// the sort planner node must refer to the time column using
|
||||
// the alias that was specified
|
||||
let time_alias = fields[0]
|
||||
.alias
|
||||
.as_ref()
|
||||
.map(|id| id.deref().as_str())
|
||||
.unwrap_or("time");
|
||||
|
||||
let time_sort_expr = time_alias.as_expr().sort(
|
||||
match select.order_by {
|
||||
// Default behaviour is to sort by time in ascending order if there is no ORDER BY
|
||||
None | Some(OrderByClause::Ascending) => true,
|
||||
Some(OrderByClause::Descending) => false,
|
||||
},
|
||||
false,
|
||||
);
|
||||
|
||||
let plan = plan_with_sort(
|
||||
plan,
|
||||
vec![select.order_by.to_sort_expr()],
|
||||
vec![time_sort_expr.clone()],
|
||||
is_multiple_measurements,
|
||||
&group_by_tag_set,
|
||||
&projection_tag_set,
|
||||
|
@ -439,7 +447,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
plan,
|
||||
select.offset,
|
||||
select.limit,
|
||||
vec![select.order_by.to_sort_expr()],
|
||||
vec![time_sort_expr],
|
||||
is_multiple_measurements,
|
||||
&group_by_tag_set,
|
||||
&projection_tag_set,
|
||||
|
@ -599,11 +607,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
|
||||
// Exclude tags that do not exist in the current table schema.
|
||||
group_by_exprs.extend(group_by_tag_set.iter().filter_map(|name| {
|
||||
if schemas
|
||||
.iox_schema
|
||||
.field_by_name(name)
|
||||
.map_or(false, |(dt, _)| dt == InfluxColumnType::Tag)
|
||||
{
|
||||
if schemas.is_tag_field(name) {
|
||||
Some(name.as_expr())
|
||||
} else {
|
||||
None
|
||||
|
@ -991,7 +995,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
|
||||
/// Map an InfluxQL [`IQLExpr`] to a DataFusion [`Expr`].
|
||||
fn expr_to_df_expr(&self, ctx: &Context<'_>, iql: &IQLExpr, schemas: &Schemas) -> Result<Expr> {
|
||||
let iox_schema = &schemas.iox_schema;
|
||||
let schema = &schemas.df_schema;
|
||||
match iql {
|
||||
// rewriter is expected to expand wildcard expressions
|
||||
IQLExpr::Wildcard(_) => error::internal("unexpected wildcard in projection"),
|
||||
|
@ -1009,12 +1013,16 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
"time".as_expr()
|
||||
}
|
||||
(ExprScope::Projection, "time") => "time".as_expr(),
|
||||
(_, name) => match iox_schema.field_by_name(name) {
|
||||
Some((col_type, _)) => {
|
||||
(_, name) => match schema
|
||||
.fields_with_unqualified_name(name)
|
||||
.first()
|
||||
.map(|f| f.data_type().clone())
|
||||
{
|
||||
Some(col_type) => {
|
||||
let column = name.as_expr();
|
||||
let src_type = data_type_to_var_ref_data_type(col_type)?;
|
||||
match opt_dst_type {
|
||||
Some(dst_type) => {
|
||||
let src_type = column_type_to_var_ref_data_type(col_type);
|
||||
if src_type == *dst_type {
|
||||
column
|
||||
} else if src_type.is_numeric_type()
|
||||
|
@ -2095,7 +2103,7 @@ fn is_aggregate_field(f: &Field) -> bool {
|
|||
|
||||
/// Find all the columns where the resolved data type
|
||||
/// is a tag or is [`None`], which is unknown.
|
||||
fn find_tag_and_unknown_columns(fields: &FieldList) -> impl Iterator<Item = &str> {
|
||||
fn find_tag_and_unknown_columns(fields: &[Field]) -> impl Iterator<Item = &str> {
|
||||
fields.iter().filter_map(|f| match &f.expr {
|
||||
IQLExpr::VarRef(VarRef {
|
||||
name,
|
||||
|
@ -2657,6 +2665,23 @@ mod test {
|
|||
mod select {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_time_column() {
|
||||
// validate time column is explicitly projected
|
||||
assert_snapshot!(plan("SELECT usage_idle, time FROM cpu"), @r###"
|
||||
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), usage_idle:Float64;N]
|
||||
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), usage_idle:Float64;N]
|
||||
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
|
||||
"###);
|
||||
|
||||
// validate time column may be aliased
|
||||
assert_snapshot!(plan("SELECT usage_idle, time AS timestamp FROM cpu"), @r###"
|
||||
Sort: timestamp ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), timestamp:Timestamp(Nanosecond, None), usage_idle:Float64;N]
|
||||
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS timestamp, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), timestamp:Timestamp(Nanosecond, None), usage_idle:Float64;N]
|
||||
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
|
||||
"###);
|
||||
}
|
||||
|
||||
/// Tests for the `DISTINCT` clause and `DISTINCT` function
|
||||
#[test]
|
||||
fn test_distinct() {
|
||||
|
@ -3485,10 +3510,9 @@ mod test {
|
|||
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
|
||||
"###);
|
||||
assert_snapshot!(plan("SELECT time as timestamp, f64_field FROM data"), @r###"
|
||||
Projection: iox::measurement, timestamp, f64_field [iox::measurement:Dictionary(Int32, Utf8), timestamp:Timestamp(Nanosecond, None), f64_field:Float64;N]
|
||||
Sort: data.time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), timestamp:Timestamp(Nanosecond, None), f64_field:Float64;N, time:Timestamp(Nanosecond, None)]
|
||||
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, data.time AS timestamp, data.f64_field AS f64_field, data.time [iox::measurement:Dictionary(Int32, Utf8), timestamp:Timestamp(Nanosecond, None), f64_field:Float64;N, time:Timestamp(Nanosecond, None)]
|
||||
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
|
||||
Sort: timestamp ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), timestamp:Timestamp(Nanosecond, None), f64_field:Float64;N]
|
||||
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, data.time AS timestamp, data.f64_field AS f64_field [iox::measurement:Dictionary(Int32, Utf8), timestamp:Timestamp(Nanosecond, None), f64_field:Float64;N]
|
||||
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
|
||||
"###);
|
||||
assert_snapshot!(plan("SELECT foo, f64_field FROM data"), @r###"
|
||||
Sort: time ASC NULLS LAST, foo ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, f64_field:Float64;N]
|
||||
|
|
|
@ -5,7 +5,6 @@ use datafusion::logical_expr::utils::find_column_exprs;
|
|||
use datafusion::logical_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
|
||||
use datafusion_util::AsExpr;
|
||||
use generated_types::influxdata::iox::querier::v1::influx_ql_metadata::TagKeyColumn;
|
||||
use influxdb_influxql_parser::common::OrderByClause;
|
||||
use influxdb_influxql_parser::expression::{Expr as IQLExpr, VarRef, VarRefDataType};
|
||||
use influxdb_influxql_parser::select::Field;
|
||||
use schema::INFLUXQL_MEASUREMENT_COLUMN_NAME;
|
||||
|
@ -121,25 +120,6 @@ pub(super) fn plan_with_sort(
|
|||
LogicalPlanBuilder::from(plan).sort(series_sort)?.build()
|
||||
}
|
||||
|
||||
/// Trait to convert the receiver to a [`Expr::Sort`] expression.
|
||||
pub(super) trait ToSortExpr {
|
||||
/// Create a sort expression.
|
||||
fn to_sort_expr(&self) -> Expr;
|
||||
}
|
||||
|
||||
impl ToSortExpr for Option<OrderByClause> {
|
||||
fn to_sort_expr(&self) -> Expr {
|
||||
"time".as_expr().sort(
|
||||
match self {
|
||||
// Default behaviour is to sort by time in ascending order if there is no ORDER BY
|
||||
None | Some(OrderByClause::Ascending) => true,
|
||||
Some(OrderByClause::Descending) => false,
|
||||
},
|
||||
false,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Map the fields to DataFusion [`Expr::Column`] expressions, excluding those columns that
|
||||
/// are [`DataType::Null`]'s.
|
||||
pub(super) fn fields_to_exprs_no_nulls<'a>(
|
||||
|
|
|
@ -132,7 +132,6 @@ use datafusion::logical_expr::{
|
|||
binary_expr, cast, coalesce, lit, BinaryExpr, Expr, ExprSchemable, Operator,
|
||||
};
|
||||
use datafusion::optimizer::utils::{conjunction, disjunction};
|
||||
use schema::{InfluxColumnType, InfluxFieldType};
|
||||
|
||||
/// Perform a series of passes to rewrite `expr` in compliance with InfluxQL behavior
|
||||
/// in an effort to ensure the query executes without error.
|
||||
|
@ -770,19 +769,17 @@ impl<'a> TreeNodeRewriter for FixRegularExpressions<'a> {
|
|||
op: op @ (Operator::RegexMatch | Operator::RegexNotMatch),
|
||||
right,
|
||||
}) => {
|
||||
if let Expr::Column(ref col) = *left {
|
||||
match self.schemas.iox_schema.field_by_name(&col.name) {
|
||||
Some((InfluxColumnType::Tag, _)) => {
|
||||
Ok(if let Expr::Column(ref col) = *left {
|
||||
match self.schemas.df_schema.field_from_column(col)?.data_type() {
|
||||
DataType::Dictionary(..) => {
|
||||
// Regular expressions expect to be compared with a Utf8
|
||||
let left =
|
||||
Box::new(left.cast_to(&DataType::Utf8, &self.schemas.df_schema)?);
|
||||
Ok(Expr::BinaryExpr(BinaryExpr { left, op, right }))
|
||||
}
|
||||
Some((InfluxColumnType::Field(InfluxFieldType::String), _)) => {
|
||||
Ok(Expr::BinaryExpr(BinaryExpr { left, op, right }))
|
||||
Expr::BinaryExpr(BinaryExpr { left, op, right })
|
||||
}
|
||||
DataType::Utf8 => Expr::BinaryExpr(BinaryExpr { left, op, right }),
|
||||
// Any other column type should evaluate to false
|
||||
_ => Ok(lit(false)),
|
||||
_ => lit(false),
|
||||
}
|
||||
} else {
|
||||
// If this is not a simple column expression, evaluate to false,
|
||||
|
@ -798,8 +795,8 @@ impl<'a> TreeNodeRewriter for FixRegularExpressions<'a> {
|
|||
// Reference example:
|
||||
//
|
||||
// * `SELECT f64 FROM m0 WHERE tag0 = '' + tag0`
|
||||
Ok(lit(false))
|
||||
}
|
||||
lit(false)
|
||||
})
|
||||
}
|
||||
_ => Ok(expr),
|
||||
}
|
||||
|
@ -829,10 +826,7 @@ mod test {
|
|||
.build()
|
||||
.expect("schema failed");
|
||||
let df_schema: DFSchemaRef = Arc::clone(iox_schema.inner()).to_dfschema_ref().unwrap();
|
||||
Schemas {
|
||||
df_schema,
|
||||
iox_schema,
|
||||
}
|
||||
Schemas { df_schema }
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
#![allow(dead_code)]
|
||||
|
||||
use crate::plan::expr_type_evaluator::evaluate_type;
|
||||
use crate::plan::field::{field_by_name, field_name};
|
||||
use crate::plan::field_mapper::{field_and_dimensions, FieldTypeMap, TagSet};
|
||||
|
@ -7,7 +5,9 @@ use crate::plan::{error, util, SchemaProvider};
|
|||
use datafusion::common::{DataFusionError, Result};
|
||||
use influxdb_influxql_parser::common::{MeasurementName, QualifiedMeasurementName};
|
||||
use influxdb_influxql_parser::expression::walk::{walk_expr, walk_expr_mut};
|
||||
use influxdb_influxql_parser::expression::{Call, Expr, VarRef, VarRefDataType, WildcardType};
|
||||
use influxdb_influxql_parser::expression::{
|
||||
AsVarRefExpr, Call, Expr, VarRef, VarRefDataType, WildcardType,
|
||||
};
|
||||
use influxdb_influxql_parser::functions::is_scalar_math_function;
|
||||
use influxdb_influxql_parser::identifier::Identifier;
|
||||
use influxdb_influxql_parser::literal::Literal;
|
||||
|
@ -20,6 +20,73 @@ use std::borrow::Borrow;
|
|||
use std::collections::{HashMap, HashSet};
|
||||
use std::ops::{ControlFlow, Deref};
|
||||
|
||||
/// Recursively rewrite the specified [`SelectStatement`] by performing a series of passes
|
||||
/// to validate and normalize the statement.
|
||||
pub(crate) fn rewrite_statement(
|
||||
s: &dyn SchemaProvider,
|
||||
q: &SelectStatement,
|
||||
) -> Result<SelectStatement> {
|
||||
let mut stmt = q.clone();
|
||||
from_expand_wildcards(s, &mut stmt)?;
|
||||
field_list_expand_wildcards(s, &mut stmt)?;
|
||||
from_drop_empty(s, &mut stmt);
|
||||
field_list_normalize_time(&mut stmt);
|
||||
field_list_rewrite_aliases(&mut stmt.fields)?;
|
||||
|
||||
Ok(stmt)
|
||||
}
|
||||
|
||||
/// Ensure the time field is added to all projections,
|
||||
/// and is moved to the first position, which is a requirement
|
||||
/// for InfluxQL compatibility.
|
||||
fn field_list_normalize_time(stmt: &mut SelectStatement) {
|
||||
fn normalize_time(stmt: &mut SelectStatement, is_subquery: bool) {
|
||||
let mut fields = stmt.fields.take();
|
||||
|
||||
if let Some(f) = match fields
|
||||
.iter()
|
||||
.find_position(
|
||||
|f| matches!(&f.expr, Expr::VarRef(VarRef { name, .. }) if name.deref() == "time"),
|
||||
)
|
||||
.map(|(i, _)| i)
|
||||
{
|
||||
Some(0) => None,
|
||||
Some(idx) => Some(fields.remove(idx)),
|
||||
None => Some(Field {
|
||||
expr: "time".to_var_ref_expr(),
|
||||
alias: None,
|
||||
}),
|
||||
} {
|
||||
fields.insert(0, f)
|
||||
}
|
||||
|
||||
let f = &mut fields[0];
|
||||
|
||||
// time aliases in subqueries is ignored
|
||||
if f.alias.is_none() || is_subquery {
|
||||
f.alias = Some("time".into())
|
||||
}
|
||||
|
||||
if let Expr::VarRef(VarRef {
|
||||
ref mut data_type, ..
|
||||
}) = f.expr
|
||||
{
|
||||
*data_type = Some(VarRefDataType::Timestamp);
|
||||
}
|
||||
|
||||
stmt.fields.replace(fields);
|
||||
}
|
||||
|
||||
normalize_time(stmt, false);
|
||||
|
||||
for stmt in stmt.from.iter_mut().filter_map(|ms| match ms {
|
||||
MeasurementSelection::Subquery(stmt) => Some(stmt),
|
||||
_ => None,
|
||||
}) {
|
||||
normalize_time(stmt, true)
|
||||
}
|
||||
}
|
||||
|
||||
/// Recursively expand the `from` clause of `stmt` and any subqueries.
|
||||
fn from_expand_wildcards(s: &dyn SchemaProvider, stmt: &mut SelectStatement) -> Result<()> {
|
||||
let mut new_from = Vec::new();
|
||||
|
@ -537,21 +604,6 @@ fn field_list_rewrite_aliases(field_list: &mut FieldList) -> Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Recursively rewrite the specified [`SelectStatement`], expanding any wildcards or regular expressions
|
||||
/// found in the projection list, `FROM` clause or `GROUP BY` clause.
|
||||
pub(crate) fn rewrite_statement(
|
||||
s: &dyn SchemaProvider,
|
||||
q: &SelectStatement,
|
||||
) -> Result<SelectStatement> {
|
||||
let mut stmt = q.clone();
|
||||
from_expand_wildcards(s, &mut stmt)?;
|
||||
field_list_expand_wildcards(s, &mut stmt)?;
|
||||
from_drop_empty(s, &mut stmt);
|
||||
field_list_rewrite_aliases(&mut stmt.fields)?;
|
||||
|
||||
Ok(stmt)
|
||||
}
|
||||
|
||||
/// Check the length of the arguments slice is within
|
||||
/// the expected bounds.
|
||||
macro_rules! check_exp_args {
|
||||
|
@ -727,6 +779,8 @@ impl FieldChecker {
|
|||
impl FieldChecker {
|
||||
fn check_expr(&mut self, e: &Expr) -> Result<()> {
|
||||
match e {
|
||||
// The `time` column is ignored
|
||||
Expr::VarRef(VarRef { name, .. }) if name.deref() == "time" => Ok(()),
|
||||
Expr::VarRef(_) => {
|
||||
self.has_non_aggregate_fields = true;
|
||||
Ok(())
|
||||
|
@ -1267,13 +1321,61 @@ pub(crate) fn select_statement_info(q: &SelectStatement) -> Result<SelectStateme
|
|||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::plan::rewriter::{
|
||||
has_wildcards, rewrite_statement, select_statement_info, ProjectionType,
|
||||
field_list_normalize_time, has_wildcards, rewrite_statement, select_statement_info,
|
||||
ProjectionType,
|
||||
};
|
||||
use crate::plan::test_utils::{parse_select, MockSchemaProvider};
|
||||
use assert_matches::assert_matches;
|
||||
use datafusion::error::DataFusionError;
|
||||
use test_helpers::{assert_contains, assert_error};
|
||||
|
||||
#[test]
|
||||
fn test_field_list_normalize_time() {
|
||||
// adds time to to first position
|
||||
let mut sel = parse_select("SELECT foo, bar FROM cpu");
|
||||
field_list_normalize_time(&mut sel);
|
||||
assert_eq!(
|
||||
sel.to_string(),
|
||||
"SELECT time::timestamp AS time, foo, bar FROM cpu"
|
||||
);
|
||||
|
||||
// moves time to first position
|
||||
let mut sel = parse_select("SELECT foo, time, bar FROM cpu");
|
||||
field_list_normalize_time(&mut sel);
|
||||
assert_eq!(
|
||||
sel.to_string(),
|
||||
"SELECT time::timestamp AS time, foo, bar FROM cpu"
|
||||
);
|
||||
|
||||
// Maintains alias for time column
|
||||
let mut sel = parse_select("SELECT time as ts, foo, bar FROM cpu");
|
||||
field_list_normalize_time(&mut sel);
|
||||
assert_eq!(
|
||||
sel.to_string(),
|
||||
"SELECT time::timestamp AS ts, foo, bar FROM cpu"
|
||||
);
|
||||
|
||||
// subqueries
|
||||
|
||||
// adds time to to first position of root and subquery
|
||||
let mut sel = parse_select("SELECT foo FROM (SELECT foo, bar FROM cpu)");
|
||||
field_list_normalize_time(&mut sel);
|
||||
assert_eq!(
|
||||
sel.to_string(),
|
||||
"SELECT time::timestamp AS time, foo FROM (SELECT time::timestamp AS time, foo, bar FROM cpu)"
|
||||
);
|
||||
|
||||
// Removes and ignores alias of time column within subquery, ignores alias in root and adds time column
|
||||
//
|
||||
// Whilst confusing, this matching InfluxQL behaviour
|
||||
let mut sel = parse_select("SELECT ts, foo FROM (SELECT time as ts, foo, bar FROM cpu)");
|
||||
field_list_normalize_time(&mut sel);
|
||||
assert_eq!(
|
||||
sel.to_string(),
|
||||
"SELECT time::timestamp AS time, ts, foo FROM (SELECT time::timestamp AS time, foo, bar FROM cpu)"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_select_statement_info() {
|
||||
let info = select_statement_info(&parse_select("SELECT foo, bar FROM cpu")).unwrap();
|
||||
|
@ -1530,15 +1632,15 @@ mod test {
|
|||
let sel = parse_select("SELECT count(distinct('foo')) FROM cpu");
|
||||
assert_error!(select_statement_info(&sel), DataFusionError::Plan(ref s) if s == "expected field argument in distinct()");
|
||||
let sel = parse_select("SELECT count(distinct foo) FROM cpu");
|
||||
assert_error!(select_statement_info(&sel), DataFusionError::External(ref s) if s.to_string() == "internal: unexpected distinct clause in count");
|
||||
assert_error!(select_statement_info(&sel), DataFusionError::External(ref s) if s.to_string() == "InfluxQL internal error: unexpected distinct clause in count");
|
||||
|
||||
// Test rules for math functions
|
||||
let sel = parse_select("SELECT abs(usage_idle) FROM cpu");
|
||||
select_statement_info(&sel).unwrap();
|
||||
let sel = parse_select("SELECT abs(*) + ceil(foo) FROM cpu");
|
||||
assert_error!(select_statement_info(&sel), DataFusionError::External(ref s) if s.to_string() == "internal: unexpected wildcard");
|
||||
assert_error!(select_statement_info(&sel), DataFusionError::External(ref s) if s.to_string() == "InfluxQL internal error: unexpected wildcard");
|
||||
let sel = parse_select("SELECT abs(/f/) + ceil(foo) FROM cpu");
|
||||
assert_error!(select_statement_info(&sel), DataFusionError::External(ref s) if s.to_string() == "internal: unexpected regex");
|
||||
assert_error!(select_statement_info(&sel), DataFusionError::External(ref s) if s.to_string() == "InfluxQL internal error: unexpected regex");
|
||||
|
||||
// Fallible
|
||||
|
||||
|
@ -1560,11 +1662,11 @@ mod test {
|
|||
|
||||
// wildcard expansion is not supported in binary expressions for aggregates
|
||||
let sel = parse_select("SELECT count(*) + count(foo) FROM cpu");
|
||||
assert_error!(select_statement_info(&sel), DataFusionError::External(ref s) if s.to_string() == "internal: unexpected wildcard or regex");
|
||||
assert_error!(select_statement_info(&sel), DataFusionError::External(ref s) if s.to_string() == "InfluxQL internal error: unexpected wildcard or regex");
|
||||
|
||||
// regex expansion is not supported in binary expressions
|
||||
let sel = parse_select("SELECT sum(/foo/) + count(foo) FROM cpu");
|
||||
assert_error!(select_statement_info(&sel), DataFusionError::External(ref s) if s.to_string() == "internal: unexpected wildcard or regex");
|
||||
assert_error!(select_statement_info(&sel), DataFusionError::External(ref s) if s.to_string() == "InfluxQL internal error: unexpected wildcard or regex");
|
||||
|
||||
// aggregate functions require a field reference
|
||||
let sel = parse_select("SELECT sum(1) FROM cpu");
|
||||
|
@ -1579,7 +1681,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT usage_user::float AS usage_user FROM cpu"
|
||||
"SELECT time::timestamp AS time, usage_user::float AS usage_user FROM cpu"
|
||||
);
|
||||
|
||||
// Duplicate columns do not have conflicting aliases
|
||||
|
@ -1587,7 +1689,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT usage_user::float AS usage_user, usage_user::float AS usage_user_1 FROM cpu"
|
||||
"SELECT time::timestamp AS time, usage_user::float AS usage_user, usage_user::float AS usage_user_1 FROM cpu"
|
||||
);
|
||||
|
||||
// Multiple aliases with no conflicts
|
||||
|
@ -1595,21 +1697,21 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT usage_user::float AS usage_user_1, usage_user::float AS usage_user FROM cpu"
|
||||
"SELECT time::timestamp AS time, usage_user::float AS usage_user_1, usage_user::float AS usage_user FROM cpu"
|
||||
);
|
||||
|
||||
// Multiple aliases with conflicts
|
||||
let stmt =
|
||||
parse_select("SELECT usage_user as usage_user_1, usage_user, usage_user, usage_user as usage_user_2, usage_user, usage_user_2 FROM cpu");
|
||||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(stmt.to_string(), "SELECT usage_user::float AS usage_user_1, usage_user::float AS usage_user, usage_user::float AS usage_user_3, usage_user::float AS usage_user_2, usage_user::float AS usage_user_4, usage_user_2 AS usage_user_2_1 FROM cpu");
|
||||
assert_eq!(stmt.to_string(), "SELECT time::timestamp AS time, usage_user::float AS usage_user_1, usage_user::float AS usage_user, usage_user::float AS usage_user_3, usage_user::float AS usage_user_2, usage_user::float AS usage_user_4, usage_user_2 AS usage_user_2_1 FROM cpu");
|
||||
|
||||
// Only include measurements with at least one field projection
|
||||
let stmt = parse_select("SELECT usage_idle FROM cpu, disk");
|
||||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT usage_idle::float AS usage_idle FROM cpu"
|
||||
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle FROM cpu"
|
||||
);
|
||||
|
||||
// Rewriting FROM clause
|
||||
|
@ -1619,7 +1721,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT bytes_free::integer AS bytes_free, bytes_read::integer AS bytes_read FROM disk, diskio"
|
||||
"SELECT time::timestamp AS time, bytes_free::integer AS bytes_free, bytes_read::integer AS bytes_read FROM disk, diskio"
|
||||
);
|
||||
|
||||
// Regex matches multiple measurement, but only one has a matching field
|
||||
|
@ -1627,7 +1729,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT bytes_free::integer AS bytes_free FROM disk"
|
||||
"SELECT time::timestamp AS time, bytes_free::integer AS bytes_free FROM disk"
|
||||
);
|
||||
|
||||
// Exact, no match
|
||||
|
@ -1647,14 +1749,14 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT cpu::tag AS cpu, host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu"
|
||||
"SELECT time::timestamp AS time, cpu::tag AS cpu, host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu"
|
||||
);
|
||||
|
||||
let stmt = parse_select("SELECT * FROM cpu, disk");
|
||||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT bytes_free::integer AS bytes_free, bytes_used::integer AS bytes_used, cpu::tag AS cpu, device::tag AS device, host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu, disk"
|
||||
"SELECT time::timestamp AS time, bytes_free::integer AS bytes_free, bytes_used::integer AS bytes_used, cpu::tag AS cpu, device::tag AS device, host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu, disk"
|
||||
);
|
||||
|
||||
// Regular expression selects fields from multiple measurements
|
||||
|
@ -1662,7 +1764,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT bytes_free::integer AS bytes_free, bytes_used::integer AS bytes_used, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu, disk"
|
||||
"SELECT time::timestamp AS time, bytes_free::integer AS bytes_free, bytes_used::integer AS bytes_used, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu, disk"
|
||||
);
|
||||
|
||||
// Selective wildcard for tags
|
||||
|
@ -1670,7 +1772,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT cpu::tag AS cpu, host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle FROM cpu"
|
||||
"SELECT time::timestamp AS time, cpu::tag AS cpu, host::tag AS host, region::tag AS region, usage_idle::float AS usage_idle FROM cpu"
|
||||
);
|
||||
|
||||
// Selective wildcard for tags only should not select any measurements
|
||||
|
@ -1683,7 +1785,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu"
|
||||
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu"
|
||||
);
|
||||
|
||||
// Mixed fields and wildcards
|
||||
|
@ -1691,7 +1793,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT usage_idle::float AS usage_idle, cpu::tag AS cpu, host::tag AS host, region::tag AS region FROM cpu"
|
||||
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle, cpu::tag AS cpu, host::tag AS host, region::tag AS region FROM cpu"
|
||||
);
|
||||
|
||||
// GROUP BY expansion
|
||||
|
@ -1700,14 +1802,14 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT usage_idle::float AS usage_idle FROM cpu GROUP BY host"
|
||||
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle FROM cpu GROUP BY host"
|
||||
);
|
||||
|
||||
let stmt = parse_select("SELECT usage_idle FROM cpu GROUP BY *");
|
||||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT usage_idle::float AS usage_idle FROM cpu GROUP BY cpu, host, region"
|
||||
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle FROM cpu GROUP BY cpu, host, region"
|
||||
);
|
||||
|
||||
// Does not include tags in projection when expanded in GROUP BY
|
||||
|
@ -1715,7 +1817,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu GROUP BY cpu, host, region"
|
||||
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu GROUP BY cpu, host, region"
|
||||
);
|
||||
|
||||
// Does include explicitly listed tags in projection
|
||||
|
@ -1723,7 +1825,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT host::tag AS host, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu GROUP BY cpu, host, region"
|
||||
"SELECT time::timestamp AS time, host::tag AS host, usage_idle::float AS usage_idle, usage_system::float AS usage_system, usage_user::float AS usage_user FROM cpu GROUP BY cpu, host, region"
|
||||
);
|
||||
|
||||
// Fallible
|
||||
|
@ -1740,7 +1842,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT usage_idle::float AS usage_idle FROM (SELECT usage_idle::float FROM cpu)"
|
||||
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle FROM (SELECT time::timestamp AS time, usage_idle::float FROM cpu)"
|
||||
);
|
||||
|
||||
// Subquery, regex, match
|
||||
|
@ -1748,7 +1850,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT bytes_free::integer AS bytes_free FROM (SELECT bytes_free::integer, bytes_read::integer FROM disk, diskio)"
|
||||
"SELECT time::timestamp AS time, bytes_free::integer AS bytes_free FROM (SELECT time::timestamp AS time, bytes_free::integer, bytes_read::integer FROM disk, diskio)"
|
||||
);
|
||||
|
||||
// Subquery, exact, no match
|
||||
|
@ -1766,7 +1868,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT usage_system_usage_idle::float AS usage_system_usage_idle FROM (SELECT usage_system::float + usage_idle::float FROM cpu)"
|
||||
"SELECT time::timestamp AS time, usage_system_usage_idle::float AS usage_system_usage_idle FROM (SELECT time::timestamp AS time, usage_system::float + usage_idle::float FROM cpu)"
|
||||
);
|
||||
|
||||
// Subquery, no fields projected should be dropped
|
||||
|
@ -1774,7 +1876,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT usage_idle::float AS usage_idle FROM cpu"
|
||||
"SELECT time::timestamp AS time, usage_idle::float AS usage_idle FROM cpu"
|
||||
);
|
||||
|
||||
// Outer query are permitted to project tags only, as long as there are other fields
|
||||
|
@ -1783,7 +1885,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT cpu::tag AS cpu FROM (SELECT cpu::tag, usage_system::float FROM cpu)"
|
||||
"SELECT time::timestamp AS time, cpu::tag AS cpu FROM (SELECT time::timestamp AS time, cpu::tag, usage_system::float FROM cpu)"
|
||||
);
|
||||
|
||||
// Outer FROM should be empty, as the subquery does not project any fields
|
||||
|
@ -1796,7 +1898,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT bytes_free::integer + bytes_used::integer AS bytes_free_bytes_used FROM disk"
|
||||
"SELECT time::timestamp AS time, bytes_free::integer + bytes_used::integer AS bytes_free_bytes_used FROM disk"
|
||||
);
|
||||
|
||||
// Unary expressions
|
||||
|
@ -1804,7 +1906,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT -1 * bytes_free::integer AS bytes_free FROM disk"
|
||||
"SELECT time::timestamp AS time, -1 * bytes_free::integer AS bytes_free FROM disk"
|
||||
);
|
||||
|
||||
// DISTINCT clause
|
||||
|
@ -1814,14 +1916,14 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT count(distinct(bytes_free::integer)) AS count FROM disk"
|
||||
"SELECT time::timestamp AS time, count(distinct(bytes_free::integer)) AS count FROM disk"
|
||||
);
|
||||
|
||||
let stmt = parse_select("SELECT DISTINCT bytes_free FROM disk");
|
||||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT distinct(bytes_free::integer) AS \"distinct\" FROM disk"
|
||||
"SELECT time::timestamp AS time, distinct(bytes_free::integer) AS \"distinct\" FROM disk"
|
||||
);
|
||||
|
||||
// Call expressions
|
||||
|
@ -1830,7 +1932,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT count(field_i64::integer) AS count FROM temp_01"
|
||||
"SELECT time::timestamp AS time, count(field_i64::integer) AS count FROM temp_01"
|
||||
);
|
||||
|
||||
// Duplicate aggregate columns
|
||||
|
@ -1838,14 +1940,14 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT count(field_i64::integer) AS count, count(field_i64::integer) AS count_1 FROM temp_01"
|
||||
"SELECT time::timestamp AS time, count(field_i64::integer) AS count, count(field_i64::integer) AS count_1 FROM temp_01"
|
||||
);
|
||||
|
||||
let stmt = parse_select("SELECT COUNT(field_f64) FROM temp_01");
|
||||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT count(field_f64::float) AS count FROM temp_01"
|
||||
"SELECT time::timestamp AS time, count(field_f64::float) AS count FROM temp_01"
|
||||
);
|
||||
|
||||
// Expands all fields
|
||||
|
@ -1853,7 +1955,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT count(field_f64::float) AS count_field_f64, count(field_i64::integer) AS count_field_i64, count(field_str::string) AS count_field_str, count(field_u64::unsigned) AS count_field_u64, count(shared_field0::float) AS count_shared_field0 FROM temp_01"
|
||||
"SELECT time::timestamp AS time, count(field_f64::float) AS count_field_f64, count(field_i64::integer) AS count_field_i64, count(field_str::string) AS count_field_str, count(field_u64::unsigned) AS count_field_u64, count(shared_field0::float) AS count_shared_field0 FROM temp_01"
|
||||
);
|
||||
|
||||
// Expands matching fields
|
||||
|
@ -1861,7 +1963,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT count(field_f64::float) AS count_field_f64, count(field_i64::integer) AS count_field_i64, count(field_u64::unsigned) AS count_field_u64 FROM temp_01"
|
||||
"SELECT time::timestamp AS time, count(field_f64::float) AS count_field_f64, count(field_i64::integer) AS count_field_i64, count(field_u64::unsigned) AS count_field_u64 FROM temp_01"
|
||||
);
|
||||
|
||||
// Expands only numeric fields
|
||||
|
@ -1869,14 +1971,14 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT sum(field_f64::float) AS sum_field_f64, sum(field_i64::integer) AS sum_field_i64, sum(field_u64::unsigned) AS sum_field_u64, sum(shared_field0::float) AS sum_shared_field0 FROM temp_01"
|
||||
"SELECT time::timestamp AS time, sum(field_f64::float) AS sum_field_f64, sum(field_i64::integer) AS sum_field_i64, sum(field_u64::unsigned) AS sum_field_u64, sum(shared_field0::float) AS sum_shared_field0 FROM temp_01"
|
||||
);
|
||||
|
||||
let stmt = parse_select("SELECT * FROM merge_00, merge_01");
|
||||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT col0::float AS col0, col0::tag AS col0_1, col1::float AS col1, col1::tag AS col1_1, col2::string AS col2, col3::string AS col3 FROM merge_00, merge_01"
|
||||
"SELECT time::timestamp AS time, col0::float AS col0, col0::tag AS col0_1, col1::float AS col1, col1::tag AS col1_1, col2::string AS col2, col3::string AS col3 FROM merge_00, merge_01"
|
||||
);
|
||||
|
||||
// This should only select merge_01, as col0 is a tag in merge_00
|
||||
|
@ -1884,7 +1986,7 @@ mod test {
|
|||
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
|
||||
assert_eq!(
|
||||
stmt.to_string(),
|
||||
"SELECT col0::float AS col0, col0::tag AS col0_1 FROM merge_01"
|
||||
"SELECT time::timestamp AS time, col0::float AS col0, col0::tag AS col0_1 FROM merge_01"
|
||||
);
|
||||
|
||||
// Fallible cases
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use crate::plan::{error, util_copy};
|
||||
use arrow::datatypes::{DataType, TimeUnit};
|
||||
use datafusion::common::{DFSchema, DFSchemaRef, Result};
|
||||
use datafusion::common::{DFSchemaRef, Result};
|
||||
use datafusion::logical_expr::utils::expr_as_column_expr;
|
||||
use datafusion::logical_expr::{lit, Expr, ExprSchemable, LogicalPlan, Operator};
|
||||
use datafusion::scalar::ScalarValue;
|
||||
|
@ -9,7 +9,6 @@ use influxdb_influxql_parser::literal::Number;
|
|||
use influxdb_influxql_parser::string::Regex;
|
||||
use query_functions::clean_non_meta_escapes;
|
||||
use query_functions::coalesce_struct::coalesce_struct;
|
||||
use schema::Schema;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub(in crate::plan) fn binary_operator_to_df_operator(op: BinaryOperator) -> Operator {
|
||||
|
@ -25,29 +24,25 @@ pub(in crate::plan) fn binary_operator_to_df_operator(op: BinaryOperator) -> Ope
|
|||
}
|
||||
}
|
||||
|
||||
/// Return the IOx schema for the specified DataFusion schema.
|
||||
pub(in crate::plan) fn schema_from_df(schema: &DFSchema) -> Result<Schema> {
|
||||
let s: Arc<arrow::datatypes::Schema> = Arc::new(schema.into());
|
||||
s.try_into().map_err(|err| {
|
||||
error::map::internal(format!(
|
||||
"unable to convert DataFusion schema to IOx schema: {err}"
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
/// Container for both the DataFusion and equivalent IOx schema.
|
||||
pub(in crate::plan) struct Schemas {
|
||||
pub(in crate::plan) df_schema: DFSchemaRef,
|
||||
pub(in crate::plan) iox_schema: Schema,
|
||||
}
|
||||
|
||||
impl Schemas {
|
||||
pub(in crate::plan) fn new(df_schema: &DFSchemaRef) -> Result<Self> {
|
||||
Ok(Self {
|
||||
df_schema: Arc::clone(df_schema),
|
||||
iox_schema: schema_from_df(df_schema)?,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns `true` if the field `name` is a tag type.
|
||||
pub(super) fn is_tag_field(&self, name: &str) -> bool {
|
||||
self.df_schema
|
||||
.fields()
|
||||
.iter()
|
||||
.any(|f| f.name() == name && matches!(f.data_type(), DataType::Dictionary(..)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Sanitize an InfluxQL regular expression and create a compiled [`regex::Regex`].
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
use crate::plan::error;
|
||||
use arrow::datatypes::DataType;
|
||||
use datafusion::common::Result;
|
||||
use influxdb_influxql_parser::expression::VarRefDataType;
|
||||
use schema::{InfluxColumnType, InfluxFieldType};
|
||||
use schema::InfluxFieldType;
|
||||
|
||||
pub(crate) fn var_ref_data_type_to_data_type(v: VarRefDataType) -> Option<DataType> {
|
||||
match v {
|
||||
|
@ -25,12 +27,17 @@ pub(crate) fn field_type_to_var_ref_data_type(v: InfluxFieldType) -> VarRefDataT
|
|||
}
|
||||
}
|
||||
|
||||
/// Maps an [`InfluxColumnType`] to a [`VarRefDataType`].
|
||||
pub(crate) fn column_type_to_var_ref_data_type(v: InfluxColumnType) -> VarRefDataType {
|
||||
match v {
|
||||
InfluxColumnType::Tag => VarRefDataType::Tag,
|
||||
InfluxColumnType::Field(ft) => field_type_to_var_ref_data_type(ft),
|
||||
InfluxColumnType::Timestamp => VarRefDataType::Timestamp,
|
||||
/// Maps an Arrow [`DataType`] to a [`VarRefDataType`].
|
||||
pub(crate) fn data_type_to_var_ref_data_type(dt: DataType) -> Result<VarRefDataType> {
|
||||
match dt {
|
||||
DataType::Dictionary(..) => Ok(VarRefDataType::Tag),
|
||||
DataType::Timestamp(..) => Ok(VarRefDataType::Timestamp),
|
||||
DataType::Utf8 => Ok(VarRefDataType::String),
|
||||
DataType::Int64 => Ok(VarRefDataType::Integer),
|
||||
DataType::UInt64 => Ok(VarRefDataType::Unsigned),
|
||||
DataType::Float64 => Ok(VarRefDataType::Float),
|
||||
DataType::Boolean => Ok(VarRefDataType::Boolean),
|
||||
_ => error::internal(format!("unable to map Arrow type {dt} to VarRefDataType")),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue