Merge pull request #7563 from influxdata/sgc/issue/last_7538

feat: Teach InfluxQL `last`, `first`, `min` and `max` selector functions
pull/24376/head
kodiakhq[bot] 2023-04-17 20:41:26 +00:00 committed by GitHub
commit 79ed8da7e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1041 additions and 803 deletions

1
Cargo.lock generated
View File

@ -2946,6 +2946,7 @@ dependencies = [
"serde",
"serde_json",
"test_helpers",
"thiserror",
"workspace-hack",
]

View File

@ -422,3 +422,43 @@ SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-1
-- aggregate, group by TIME and tag, multiple measurements
SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu, device LIMIT 1;
--
-- Selector functions
--
-- Single selectors, which should produce a non-zero timestamp
-- for the matching value of the selector function.
SELECT FIRST(usage_idle) FROM cpu;
SELECT LAST(usage_idle) FROM cpu;
SELECT MAX(usage_idle) FROM cpu;
SELECT MIN(usage_idle) FROM cpu;
-- Single selectors with a GROUP BY tag should produce non-zero
-- timestamps for the values in each group
SELECT FIRST(usage_idle) FROM cpu GROUP BY cpu;
SELECT LAST(usage_idle) FROM cpu GROUP BY cpu;
SELECT MAX(usage_idle) FROM cpu GROUP BY cpu;
SELECT MIN(usage_idle) FROM cpu GROUP BY cpu;
-- Adding group by time means the timestamp
-- should be the start of each window
SELECT FIRST(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:00:30Z' GROUP BY TIME(10s);
-- Multiple selectors result in an aggregate query and therefore
-- a zero timestamp value.
SELECT FIRST(usage_idle), MAX(usage_idle) FROM cpu;
-- Mixing selectors and aggregates,
-- timestamp should be start of each window
SELECT FIRST(usage_idle), COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu LIMIT 1;
-- FILL(0)
SELECT FIRST(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu FILL(0);
--
-- Not implemented: Selector queries with tag and field projections
-- See: https://github.com/influxdata/influxdb_iox/issues/7533
--
SELECT FIRST(usage_idle), usage_user, usage_system FROM cpu;

View File

@ -2190,4 +2190,191 @@ tags: cpu=, device=disk1s5
| time | count | count_1 |
+---------------------+-------+---------+
| 2022-10-31T02:00:00 | | 2 |
+---------------------+-------+---------+
+---------------------+-------+---------+
-- InfluxQL: SELECT FIRST(usage_idle) FROM cpu;
name: cpu
+---------------------+-------+
| time | first |
+---------------------+-------+
| 2022-10-31T02:00:00 | 2.98 |
+---------------------+-------+
-- InfluxQL: SELECT LAST(usage_idle) FROM cpu;
name: cpu
+---------------------+------+
| time | last |
+---------------------+------+
| 2022-10-31T02:00:10 | 2.99 |
+---------------------+------+
-- InfluxQL: SELECT MAX(usage_idle) FROM cpu;
name: cpu
+---------------------+------+
| time | max |
+---------------------+------+
| 2022-10-31T02:00:10 | 2.99 |
+---------------------+------+
-- InfluxQL: SELECT MIN(usage_idle) FROM cpu;
name: cpu
+---------------------+------+
| time | min |
+---------------------+------+
| 2022-10-31T02:00:00 | 0.98 |
+---------------------+------+
-- InfluxQL: SELECT FIRST(usage_idle) FROM cpu GROUP BY cpu;
name: cpu
tags: cpu=cpu-total
+---------------------+-------+
| time | first |
+---------------------+-------+
| 2022-10-31T02:00:00 | 2.98 |
+---------------------+-------+
name: cpu
tags: cpu=cpu0
+---------------------+-------+
| time | first |
+---------------------+-------+
| 2022-10-31T02:00:00 | 0.98 |
+---------------------+-------+
name: cpu
tags: cpu=cpu1
+---------------------+-------+
| time | first |
+---------------------+-------+
| 2022-10-31T02:00:00 | 1.98 |
+---------------------+-------+
-- InfluxQL: SELECT LAST(usage_idle) FROM cpu GROUP BY cpu;
name: cpu
tags: cpu=cpu-total
+---------------------+------+
| time | last |
+---------------------+------+
| 2022-10-31T02:00:10 | 2.99 |
+---------------------+------+
name: cpu
tags: cpu=cpu0
+---------------------+------+
| time | last |
+---------------------+------+
| 2022-10-31T02:00:10 | 0.99 |
+---------------------+------+
name: cpu
tags: cpu=cpu1
+---------------------+------+
| time | last |
+---------------------+------+
| 2022-10-31T02:00:10 | 1.99 |
+---------------------+------+
-- InfluxQL: SELECT MAX(usage_idle) FROM cpu GROUP BY cpu;
name: cpu
tags: cpu=cpu-total
+---------------------+------+
| time | max |
+---------------------+------+
| 2022-10-31T02:00:10 | 2.99 |
+---------------------+------+
name: cpu
tags: cpu=cpu0
+---------------------+------+
| time | max |
+---------------------+------+
| 2022-10-31T02:00:10 | 0.99 |
+---------------------+------+
name: cpu
tags: cpu=cpu1
+---------------------+------+
| time | max |
+---------------------+------+
| 2022-10-31T02:00:10 | 1.99 |
+---------------------+------+
-- InfluxQL: SELECT MIN(usage_idle) FROM cpu GROUP BY cpu;
name: cpu
tags: cpu=cpu-total
+---------------------+------+
| time | min |
+---------------------+------+
| 2022-10-31T02:00:00 | 2.98 |
+---------------------+------+
name: cpu
tags: cpu=cpu0
+---------------------+------+
| time | min |
+---------------------+------+
| 2022-10-31T02:00:00 | 0.98 |
+---------------------+------+
name: cpu
tags: cpu=cpu1
+---------------------+------+
| time | min |
+---------------------+------+
| 2022-10-31T02:00:00 | 1.98 |
+---------------------+------+
-- InfluxQL: SELECT FIRST(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:00:30Z' GROUP BY TIME(10s);
name: cpu
+---------------------+-------+
| time | first |
+---------------------+-------+
| 2022-10-31T02:00:00 | 2.98 |
| 2022-10-31T02:00:10 | 2.99 |
| 2022-10-31T02:00:20 | |
+---------------------+-------+
-- InfluxQL: SELECT FIRST(usage_idle), MAX(usage_idle) FROM cpu;
name: cpu
+---------------------+-------+------+
| time | first | max |
+---------------------+-------+------+
| 1970-01-01T00:00:00 | 2.98 | 2.99 |
+---------------------+-------+------+
-- InfluxQL: SELECT FIRST(usage_idle), COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu LIMIT 1;
name: cpu
tags: cpu=cpu-total
+---------------------+-------+-------+
| time | first | count |
+---------------------+-------+-------+
| 2022-10-31T02:00:00 | 2.98 | 2 |
+---------------------+-------+-------+
name: cpu
tags: cpu=cpu0
+---------------------+-------+-------+
| time | first | count |
+---------------------+-------+-------+
| 2022-10-31T02:00:00 | 0.98 | 2 |
+---------------------+-------+-------+
name: cpu
tags: cpu=cpu1
+---------------------+-------+-------+
| time | first | count |
+---------------------+-------+-------+
| 2022-10-31T02:00:00 | 1.98 | 2 |
+---------------------+-------+-------+
-- InfluxQL: SELECT FIRST(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu FILL(0);
name: cpu
tags: cpu=cpu-total
+---------------------+-------+
| time | first |
+---------------------+-------+
| 2022-10-31T02:00:00 | 2.98 |
| 2022-10-31T02:00:30 | 0.0 |
| 2022-10-31T02:01:00 | 0.0 |
| 2022-10-31T02:01:30 | 0.0 |
+---------------------+-------+
name: cpu
tags: cpu=cpu0
+---------------------+-------+
| time | first |
+---------------------+-------+
| 2022-10-31T02:00:00 | 0.98 |
| 2022-10-31T02:00:30 | 0.0 |
| 2022-10-31T02:01:00 | 0.0 |
| 2022-10-31T02:01:30 | 0.0 |
+---------------------+-------+
name: cpu
tags: cpu=cpu1
+---------------------+-------+
| time | first |
+---------------------+-------+
| 2022-10-31T02:00:00 | 1.98 |
| 2022-10-31T02:00:30 | 0.0 |
| 2022-10-31T02:01:00 | 0.0 |
| 2022-10-31T02:01:30 | 0.0 |
+---------------------+-------+
-- InfluxQL: SELECT FIRST(usage_idle), usage_user, usage_system FROM cpu;
Error while planning query: This feature is not implemented: projections with a single selector and fields: See https://github.com/influxdata/influxdb_iox/issues/7533

View File

@ -22,6 +22,7 @@ query_functions = { path = "../query_functions"}
regex = "1"
schema = { path = "../schema" }
serde_json = "1.0.96"
thiserror = "1.0"
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies] # In alphabetical order

View File

@ -0,0 +1,45 @@
use datafusion::common::Result;
/// An error that was the result of an invalid InfluxQL query.
pub(crate) fn query<T>(s: impl Into<String>) -> Result<T> {
Err(map::query(s))
}
/// An unexpected error whilst planning that represents a bug in IOx.
pub(crate) fn internal<T>(s: impl Into<String>) -> Result<T> {
Err(map::internal(s))
}
/// The specified `feature` is not implemented.
pub(crate) fn not_implemented<T>(feature: impl Into<String>) -> Result<T> {
Err(map::not_implemented(feature))
}
/// Functions that return a DataFusionError rather than a `Result<T, DataFusionError>`
/// making them convenient to use with functions like `map_err`.
pub(crate) mod map {
use datafusion::common::DataFusionError;
use thiserror::Error;
#[derive(Debug, Error)]
enum PlannerError {
/// An unexpected error that represents a bug in IOx.
#[error("internal: {0}")]
Internal(String),
}
/// An error that was the result of an invalid InfluxQL query.
pub(crate) fn query(s: impl Into<String>) -> DataFusionError {
DataFusionError::Plan(s.into())
}
/// An unexpected error whilst planning that represents a bug in IOx.
pub(crate) fn internal(s: impl Into<String>) -> DataFusionError {
DataFusionError::External(Box::new(PlannerError::Internal(s.into())))
}
/// The specified `feature` is not implemented.
pub(crate) fn not_implemented(feature: impl Into<String>) -> DataFusionError {
DataFusionError::NotImplemented(feature.into())
}
}

View File

@ -1,7 +1,7 @@
use crate::plan::field::field_by_name;
use crate::plan::field_mapper::map_type;
use crate::plan::SchemaProvider;
use datafusion::common::{DataFusionError, Result};
use crate::plan::{error, SchemaProvider};
use datafusion::common::Result;
use influxdb_influxql_parser::common::{MeasurementName, QualifiedMeasurementName};
use influxdb_influxql_parser::expression::{Call, Expr, VarRef, VarRefDataType};
use influxdb_influxql_parser::literal::Literal;
@ -113,9 +113,7 @@ impl<'a> TypeEvaluator<'a> {
}
}
_ => {
return Err(DataFusionError::Internal(
"eval_var_ref: Unexpected MeasurementSelection".to_string(),
))
return error::internal("eval_var_ref: Unexpected MeasurementSelection")
}
}
}

View File

@ -1,5 +1,3 @@
#![allow(dead_code)]
use crate::plan::var_ref::field_type_to_var_ref_data_type;
use crate::plan::SchemaProvider;
use datafusion::common::Result;

View File

@ -1,3 +1,4 @@
mod error;
mod expr_type_evaluator;
mod field;
mod field_mapper;

View File

@ -1,5 +1,6 @@
mod select;
use crate::plan::error;
use crate::plan::planner::select::{
check_exprs_satisfy_columns, fields_to_exprs_no_nulls, make_tag_key_column_meta,
plan_with_sort, ToSortExpr,
@ -8,7 +9,9 @@ use crate::plan::planner_rewrite_expression::{rewrite_conditional, rewrite_expr}
use crate::plan::planner_time_range_expression::{
duration_expr_to_nanoseconds, expr_to_df_interval_dt, time_range_to_df_expr,
};
use crate::plan::rewriter::rewrite_statement;
use crate::plan::rewriter::{
rewrite_statement, select_statement_info, ProjectionType, SelectStatementInfo,
};
use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, Schemas};
use crate::plan::var_ref::{column_type_to_var_ref_data_type, var_ref_data_type_to_data_type};
use arrow::array::{StringBuilder, StringDictionaryBuilder};
@ -17,7 +20,7 @@ use arrow::record_batch::RecordBatch;
use chrono_tz::Tz;
use datafusion::catalog::TableReference;
use datafusion::common::tree_node::{TreeNode, TreeNodeRewriter};
use datafusion::common::{DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, ToDFSchema};
use datafusion::common::{DFSchema, DFSchemaRef, Result, ScalarValue, ToDFSchema};
use datafusion::datasource::{provider_as_source, MemTable};
use datafusion::logical_expr::expr_rewriter::normalize_col;
use datafusion::logical_expr::logical_plan::builder::project;
@ -27,8 +30,8 @@ use datafusion::logical_expr::{
binary_expr, col, date_bin, expr, expr::WindowFunction, lit, lit_timestamp_nano, now,
window_function, Aggregate, AggregateFunction, AggregateUDF, Between, BinaryExpr,
BuiltInWindowFunction, BuiltinScalarFunction, EmptyRelation, Explain, Expr, ExprSchemable,
Extension, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ScalarUDF, TableSource,
ToStringifiedPlan, WindowFrame, WindowFrameBound, WindowFrameUnits,
Extension, GetIndexedField, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ScalarUDF,
TableSource, ToStringifiedPlan, WindowFrame, WindowFrameBound, WindowFrameUnits,
};
use datafusion::prelude::Column;
use datafusion_util::{lit_dict, AsExpr};
@ -62,7 +65,15 @@ use iox_query::exec::gapfill::{FillStrategy, GapFill, GapFillParams};
use iox_query::logical_optimizer::range_predicate::find_time_range;
use itertools::Itertools;
use once_cell::sync::Lazy;
use query_functions::clean_non_meta_escapes;
use query_functions::selectors::{
selector_first, selector_last, selector_max, selector_min, SelectorOutput,
};
use query_functions::{
clean_non_meta_escapes,
selectors::{
struct_selector_first, struct_selector_last, struct_selector_max, struct_selector_min,
},
};
use schema::{
InfluxColumnType, InfluxFieldType, Schema, INFLUXQL_MEASUREMENT_COLUMN_NAME,
INFLUXQL_METADATA_KEY,
@ -128,8 +139,7 @@ struct Context<'a> {
scope: ExprScope,
tz: Option<Tz>,
/// `true` if the query projection specifies aggregate expressions.
is_aggregate: bool,
info: SelectStatementInfo,
// GROUP BY information
group_by: Option<&'a GroupByClause>,
@ -137,8 +147,11 @@ struct Context<'a> {
}
impl<'a> Context<'a> {
fn new() -> Self {
Default::default()
fn new(info: SelectStatementInfo) -> Self {
Self {
info,
..Default::default()
}
}
fn with_scope(&self, scope: ExprScope) -> Self {
@ -158,16 +171,16 @@ impl<'a> Context<'a> {
}
}
fn with_is_aggregate(&self, is_aggregate: bool) -> Self {
Self {
is_aggregate,
..*self
}
}
fn fill(&self) -> FillClause {
self.fill.unwrap_or_default()
}
fn is_aggregate(&self) -> bool {
matches!(
self.info.projection_type,
ProjectionType::Aggregate | ProjectionType::Selector { .. }
)
}
}
#[allow(missing_debug_implementations)]
@ -183,27 +196,20 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
pub fn statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
match statement {
Statement::CreateDatabase(_) => {
Err(DataFusionError::NotImplemented("CREATE DATABASE".into()))
}
Statement::Delete(_) => Err(DataFusionError::NotImplemented("DELETE".into())),
Statement::DropMeasurement(_) => {
Err(DataFusionError::NotImplemented("DROP MEASUREMENT".into()))
}
Statement::CreateDatabase(_) => error::not_implemented("CREATE DATABASE"),
Statement::Delete(_) => error::not_implemented("DELETE"),
Statement::DropMeasurement(_) => error::not_implemented("DROP MEASUREMENT"),
Statement::Explain(explain) => self.explain_statement_to_plan(*explain),
Statement::Select(select) => self.select_statement_to_plan(
&Context::new(),
&self.rewrite_select_statement(*select)?,
),
Statement::ShowDatabases(_) => {
Err(DataFusionError::NotImplemented("SHOW DATABASES".into()))
Statement::Select(select) => {
self.select_statement_to_plan(&self.rewrite_select_statement(*select)?)
}
Statement::ShowDatabases(_) => error::not_implemented("SHOW DATABASES"),
Statement::ShowMeasurements(show_measurements) => {
self.show_measurements_to_plan(*show_measurements)
}
Statement::ShowRetentionPolicies(_) => Err(DataFusionError::NotImplemented(
"SHOW RETENTION POLICIES".into(),
)),
Statement::ShowRetentionPolicies(_) => {
error::not_implemented("SHOW RETENTION POLICIES")
}
Statement::ShowTagKeys(show_tag_keys) => self.show_tag_keys_to_plan(*show_tag_keys),
Statement::ShowTagValues(show_tag_values) => {
self.show_tag_values_to_plan(*show_tag_values)
@ -215,10 +221,8 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
}
fn explain_statement_to_plan(&self, explain: ExplainStatement) -> Result<LogicalPlan> {
let plan = self.select_statement_to_plan(
&Context::new(),
&self.rewrite_select_statement(*explain.select)?,
)?;
let plan =
self.select_statement_to_plan(&self.rewrite_select_statement(*explain.select)?)?;
let plan = Arc::new(plan);
let schema = LogicalPlan::explain_schema();
let schema = schema.to_dfschema_ref()?;
@ -227,9 +231,8 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
// grouped into tables in the output when formatted as InfluxQL tabular format.
let measurement_column_index = schema
.index_of_column_by_name(None, "plan_type")?
.ok_or_else(|| {
DataFusionError::External("internal: unable to find plan_type column".into())
})? as u32;
.ok_or_else(|| error::map::internal("unable to find plan_type column"))?
as u32;
let (analyze, verbose) = match explain.options {
Some(ExplainOption::AnalyzeVerbose) => (true, true),
@ -269,21 +272,12 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
}
/// Create a [`LogicalPlan`] from the specified InfluxQL `SELECT` statement.
fn select_statement_to_plan(
&self,
ctx: &Context<'_>,
select: &SelectStatement,
) -> Result<LogicalPlan> {
fn select_statement_to_plan(&self, select: &SelectStatement) -> Result<LogicalPlan> {
let mut plans = self.plan_from_tables(&select.from)?;
let ctx = ctx
let ctx = Context::new(select_statement_info(select)?)
.with_timezone(select.timezone)
.with_group_by_fill(select)
.with_is_aggregate(
has_aggregate_exprs(&select.fields)
|| (select.group_by.is_some()
&& select.group_by.as_ref().unwrap().time_dimension().is_some()),
);
.with_group_by_fill(select);
// The `time` column is always present in the result set
let mut fields = if find_time_column_index(&select.fields).is_none() {
@ -406,8 +400,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
},
)?;
// true if the input plan is the UNION, indicating
// the result set produces multiple tables or measurements.
// The UNION operator indicates the result set produces multiple tables or measurements.
let is_multiple_measurements = matches!(plan, LogicalPlan::Union(_));
let plan = plan_with_sort(
@ -445,11 +438,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
let schemas = Schemas::new(input.schema())?;
// To be consistent with InfluxQL, exclude measurements
// when there are no matching fields.
// when the projection has no matching fields.
if !fields.iter().any(|f| {
// Walk the expression tree for the field
// looking for a reference to one column that
// is a field
// Walk the expression tree of `f`, looking for a
// reference to at least one column that is a field
walk_expr(&f.expr, &mut |e| match e {
IQLExpr::VarRef(VarRef { name, .. }) => {
match schemas.iox_schema.field_by_name(name.deref().as_str()) {
@ -484,18 +476,14 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
ctx: &Context<'_>,
input: LogicalPlan,
fields: &[Field],
select_exprs: Vec<Expr>,
mut select_exprs: Vec<Expr>,
group_by_tag_set: &[&str],
schemas: &Schemas,
) -> Result<(LogicalPlan, Vec<Expr>)> {
if !ctx.is_aggregate {
if !ctx.is_aggregate() {
return Ok((input, select_exprs));
}
let Some(time_column_index) = find_time_column_index(fields) else {
return Err(DataFusionError::Internal("unable to find time column".to_owned()))
};
// Find a list of unique aggregate expressions from the projection.
//
// For example, a projection such as:
@ -513,6 +501,65 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
// table.
let aggr_exprs = find_aggregate_exprs(&select_exprs);
// This block identifies the time column index and updates the time expression
// based on the semantics of the projection.
let time_column_index = {
let Some(time_column_index) = find_time_column_index(fields) else {
return error::internal("unable to find time column")
};
// Take ownership of the alias, so we don't reallocate, and temporarily place a literal
// `NULL` in its place.
let Expr::Alias(_, alias) = std::mem::replace(&mut select_exprs[time_column_index], lit(ScalarValue::Null)) else {
return error::internal("time column is not an alias")
};
// Rewrite the `time` column projection based on a series of rules in the following
// order. If the query:
//
// 1. is binning by time, project the column using the `DATE_BIN` function,
// 2. is a single-selector query, project the `time` field of the selector aggregate,
// 3. otherwise, project the Unix epoch (0)
select_exprs[time_column_index] = if let Some(dim) = ctx.group_by.and_then(|gb| gb.time_dimension()) {
let stride = expr_to_df_interval_dt(&dim.interval)?;
let offset = if let Some(offset) = &dim.offset {
duration_expr_to_nanoseconds(offset)?
} else {
0
};
date_bin(
stride,
"time".as_expr(),
lit(ScalarValue::TimestampNanosecond(Some(offset), None)),
)
} else if let ProjectionType::Selector { has_fields } =
ctx.info.projection_type
{
if has_fields {
return error::not_implemented("projections with a single selector and fields: See https://github.com/influxdata/influxdb_iox/issues/7533");
}
let selector = match aggr_exprs.len() {
1 => aggr_exprs[0].clone(),
len => {
// Should have been validated by `select_statement_info`
return error::internal(format!("internal: expected 1 selector expression, got {len}"));
}
};
Expr::GetIndexedField(GetIndexedField {
expr: Box::new(selector),
key: ScalarValue::Utf8(Some("time".to_owned())),
})
} else {
lit_timestamp_nano(0)
}
.alias(alias);
time_column_index
};
let aggr_group_by_exprs = if let Some(group_by) = ctx.group_by {
let mut group_by_exprs = Vec::new();
@ -573,9 +620,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
_ => {
// The InfluxQL planner adds the `date_bin` function,
// so this condition represents an internal failure.
return Err(DataFusionError::Internal(
"expected DATE_BIN function".to_owned(),
));
return error::internal("expected DATE_BIN function");
}
};
@ -738,11 +783,11 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
let limit = limit
.map(|v| <u64 as TryInto<i64>>::try_into(*v))
.transpose()
.map_err(|_| DataFusionError::Plan("limit out of range".to_owned()))?;
.map_err(|_| error::map::query("limit out of range"))?;
let offset = offset
.map(|v| <u64 as TryInto<i64>>::try_into(*v))
.transpose()
.map_err(|_| DataFusionError::Plan("offset out of range".to_owned()))?;
.map_err(|_| error::map::query("offset out of range".to_owned()))?;
// a reference to the ROW_NUMBER column.
let row_alias = IOX_ROW_ALIAS.as_expr();
@ -806,7 +851,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
return Ok(input);
}
Err(DataFusionError::NotImplemented("SLIMIT or SOFFSET".into()))
error::not_implemented("SLIMIT or SOFFSET")
}
/// Map the InfluxQL `SELECT` projection list into a list of DataFusion expressions.
@ -912,79 +957,53 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
let iox_schema = &schemas.iox_schema;
match iql {
// rewriter is expected to expand wildcard expressions
IQLExpr::Wildcard(_) => Err(DataFusionError::Internal(
"unexpected wildcard in projection".into(),
)),
IQLExpr::Wildcard(_) => error::internal("unexpected wildcard in projection"),
IQLExpr::VarRef(VarRef {
name,
data_type: opt_dst_type,
}) => {
let name = normalize_identifier(name);
Ok(
if ctx.scope == ExprScope::Where && name.eq_ignore_ascii_case("time") {
// Per the Go implementation, the time column is case-insensitive in the
// `WHERE` clause and disregards any postfix type cast operator.
//
// See: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L5751-L5753
Ok(match (ctx.scope, name.as_str()) {
// Per the Go implementation, the time column is case-insensitive in the
// `WHERE` clause and disregards any postfix type cast operator.
//
// See: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L5751-L5753
(ExprScope::Where, name) if name.eq_ignore_ascii_case("time") => {
"time".as_expr()
} else if ctx.scope == ExprScope::Projection && name == "time" {
if ctx.is_aggregate {
// In the projection, determine whether the query is projecting the time column
// or binning the time.
if let Some(group_by) = ctx.group_by {
if let Some(dim) = group_by.time_dimension() {
let stride = expr_to_df_interval_dt(&dim.interval)?;
let offset = if let Some(offset) = &dim.offset {
duration_expr_to_nanoseconds(offset)?
}
(ExprScope::Projection, "time") => "time".as_expr(),
(_, name) => match iox_schema.field_by_name(name) {
Some((col_type, _)) => {
let column = name.as_expr();
match opt_dst_type {
Some(dst_type) => {
let src_type = column_type_to_var_ref_data_type(col_type);
if src_type == *dst_type {
column
} else if src_type.is_numeric_type()
&& dst_type.is_numeric_type()
{
// InfluxQL only allows casting between numeric types,
// and it is safe to unconditionally unwrap, as the
// `is_numeric_type` call guarantees it can be mapped to
// an Arrow DataType
column.cast_to(
&var_ref_data_type_to_data_type(*dst_type).unwrap(),
&schemas.df_schema,
)?
} else {
0
};
return Ok(date_bin(
stride,
"time".as_expr(),
lit(ScalarValue::TimestampNanosecond(Some(offset), None)),
));
}
}
lit_timestamp_nano(0)
} else {
"time".as_expr()
}
} else {
match iox_schema.field_by_name(&name) {
Some((col_type, _)) => {
let column = name.as_expr();
match opt_dst_type {
Some(dst_type) => {
let src_type = column_type_to_var_ref_data_type(col_type);
if src_type == *dst_type {
column
} else if src_type.is_numeric_type()
&& dst_type.is_numeric_type()
{
// InfluxQL only allows casting between numeric types,
// and it is safe to unconditionally unwrap, as the
// `is_numeric_type` call guarantees it can be mapped to
// an Arrow DataType
column.cast_to(
&var_ref_data_type_to_data_type(*dst_type).unwrap(),
&schemas.df_schema,
)?
} else {
// If the cast is incompatible, evaluates to NULL
Expr::Literal(ScalarValue::Null)
}
// If the cast is incompatible, evaluates to NULL
Expr::Literal(ScalarValue::Null)
}
None => column,
}
None => column,
}
_ => Expr::Literal(ScalarValue::Null),
}
_ => Expr::Literal(ScalarValue::Null),
},
)
})
}
IQLExpr::BindParameter(_) => Err(DataFusionError::NotImplemented("parameter".into())),
IQLExpr::BindParameter(_) => error::not_implemented("parameter"),
IQLExpr::Literal(val) => match val {
Literal::Integer(v) => Ok(lit(*v)),
Literal::Unsigned(v) => Ok(lit(*v)),
@ -995,19 +1014,17 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
Some(v.timestamp()),
None,
))),
Literal::Duration(_) => {
Err(DataFusionError::NotImplemented("duration literal".into()))
}
Literal::Duration(_) => error::not_implemented("duration literal"),
Literal::Regex(re) => match ctx.scope {
// a regular expression in a projection list is unexpected,
// as it should have been expanded by the rewriter.
ExprScope::Projection => Err(DataFusionError::Internal(
"unexpected regular expression found in projection".into(),
)),
ExprScope::Projection => {
error::internal("unexpected regular expression found in projection")
}
ExprScope::Where => Ok(lit(clean_non_meta_escapes(re.as_str()))),
},
},
IQLExpr::Distinct(_) => Err(DataFusionError::NotImplemented("DISTINCT".into())),
IQLExpr::Distinct(_) => error::not_implemented("DISTINCT"),
IQLExpr::Call(call) => self.call_to_df_expr(ctx, call, schemas),
IQLExpr::Binary(expr) => self.arithmetic_expr_to_df_expr(ctx, expr, schemas),
IQLExpr::Nested(e) => self.expr_to_df_expr(ctx, e, schemas),
@ -1038,12 +1055,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
match ctx.scope {
ExprScope::Where => {
if call.name.eq_ignore_ascii_case("now") {
Err(DataFusionError::NotImplemented("now".into()))
error::not_implemented("now")
} else {
let name = &call.name;
Err(DataFusionError::External(
format!("invalid function call in condition: {name}").into(),
))
error::query(format!("invalid function call in condition: {name}"))
}
}
ExprScope::Projection => self.function_to_df_expr(ctx, call, schemas),
@ -1059,9 +1074,9 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
fn check_arg_count(name: &str, args: &[IQLExpr], count: usize) -> Result<()> {
let got = args.len();
if got != count {
Err(DataFusionError::Plan(format!(
error::query(format!(
"invalid number of arguments for {name}: expected {count}, got {got}"
)))
))
} else {
Ok(())
}
@ -1069,37 +1084,64 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
let Call { name, args } = call;
let expr = self.expr_to_df_expr(ctx, &args[0], schemas)?;
if let Expr::Literal(ScalarValue::Null) = expr {
return Ok(expr);
}
match name.as_str() {
"count" => {
// TODO(sgc): Handle `COUNT DISTINCT` variants
let distinct = false;
check_arg_count("count", args, 1)?;
let expr = self.expr_to_df_expr(ctx, &args[0], schemas)?;
match &expr {
Expr::Literal(ScalarValue::Null) => Ok(expr),
_ => Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
AggregateFunction::Count,
vec![expr],
distinct,
None,
))),
}
Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
AggregateFunction::Count,
vec![expr],
distinct,
None,
)))
}
"sum" | "stddev" | "mean" | "median" => {
check_arg_count(name, args, 1)?;
let expr = self.expr_to_df_expr(ctx, &args[0], schemas)?;
match &expr {
Expr::Literal(ScalarValue::Null) => Ok(expr),
_ => Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
AggregateFunction::from_str(name)?,
vec![expr],
false,
None,
))),
}
Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
AggregateFunction::from_str(name)?,
vec![expr],
false,
None,
)))
}
_ => Err(DataFusionError::Plan(format!("Invalid function '{name}'"))),
name @ ("first" | "last" | "min" | "max") => Ok(
if let ProjectionType::Selector { .. } = ctx.info.projection_type {
// Selector queries use the `struct_selector_<name>`, as they
// will project the value and the time fields of the struct
Expr::GetIndexedField(GetIndexedField {
expr: Box::new(
match name {
"first" => struct_selector_first(),
"last" => struct_selector_last(),
"max" => struct_selector_max(),
"min" => struct_selector_min(),
_ => unreachable!(),
}
.call(vec![expr, "time".as_expr()]),
),
key: ScalarValue::Utf8(Some("value".to_owned())),
})
} else {
// All other queries only require the value of the selector
let data_type = &expr.get_type(&schemas.df_schema)?;
match name {
"first" => selector_first(data_type, SelectorOutput::Value),
"last" => selector_last(data_type, SelectorOutput::Value),
"max" => selector_max(data_type, SelectorOutput::Value),
"min" => selector_min(data_type, SelectorOutput::Value),
_ => unreachable!(),
}
.call(vec![expr, "time".as_expr()])
},
),
_ => error::query(format!("Invalid function '{name}'")),
}
}
@ -1119,9 +1161,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
match BuiltinScalarFunction::from_str(call.name.as_str())? {
BuiltinScalarFunction::Log => {
if args.len() != 2 {
Err(DataFusionError::Plan(
"invalid number of arguments for log, expected 2, got 1".to_owned(),
))
error::query("invalid number of arguments for log, expected 2, got 1")
} else {
Ok(Expr::ScalarFunction {
fun: BuiltinScalarFunction::Log,
@ -1188,13 +1228,13 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
self.create_table_ref(normalize_identifier(ident))
}
// rewriter is expected to expand the regular expression
MeasurementName::Regex(_) => Err(DataFusionError::Internal(
"unexpected regular expression in FROM clause".into(),
)),
MeasurementName::Regex(_) => error::internal(
"unexpected regular expression in FROM clause",
),
},
MeasurementSelection::Subquery(_) => Err(DataFusionError::NotImplemented(
"subquery in FROM clause".into(),
)),
MeasurementSelection::Subquery(_) => error::not_implemented(
"subquery in FROM clause",
),
}? else { continue };
table_projs.push_back(table_proj);
}
@ -1235,14 +1275,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
let mut out = HashSet::new();
for qualified_name in from.iter() {
if qualified_name.database.is_some() {
return Err(DataFusionError::NotImplemented(
"database name in from clause".into(),
));
return error::not_implemented("database name in from clause");
}
if qualified_name.retention_policy.is_some() {
return Err(DataFusionError::NotImplemented(
"retention policy in from clause".into(),
));
return error::not_implemented("retention policy in from clause");
}
match &qualified_name.name {
MeasurementName::Name(name) => {
@ -1272,14 +1308,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
fn show_tag_keys_to_plan(&self, show_tag_keys: ShowTagKeysStatement) -> Result<LogicalPlan> {
if show_tag_keys.database.is_some() {
// How do we handle this? Do we need to perform cross-namespace queries here?
return Err(DataFusionError::NotImplemented(
"SHOW TAG KEYS ON <database>".into(),
));
return error::not_implemented("SHOW TAG KEYS ON <database>");
}
if show_tag_keys.condition.is_some() {
return Err(DataFusionError::NotImplemented(
"SHOW TAG KEYS WHERE <condition>".into(),
));
return error::not_implemented("SHOW TAG KEYS WHERE <condition>");
}
let tag_key_col = "tagKey";
@ -1351,9 +1383,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
) -> Result<LogicalPlan> {
if show_field_keys.database.is_some() {
// How do we handle this? Do we need to perform cross-namespace queries here?
return Err(DataFusionError::NotImplemented(
"SHOW FIELD KEYS ON <database>".into(),
));
return error::not_implemented("SHOW FIELD KEYS ON <database>");
}
let field_key_col = "fieldKey";
@ -1431,14 +1461,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
) -> Result<LogicalPlan> {
if show_tag_values.database.is_some() {
// How do we handle this? Do we need to perform cross-namespace queries here?
return Err(DataFusionError::NotImplemented(
"SHOW TAG VALUES ON <database>".into(),
));
return error::not_implemented("SHOW TAG VALUES ON <database>");
}
if show_tag_values.condition.is_some() {
return Err(DataFusionError::NotImplemented(
"SHOW TAG VALUES WHERE <condition>".into(),
));
return error::not_implemented("SHOW TAG VALUES WHERE <condition>");
}
let key_col = "key";
@ -1529,14 +1555,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
) -> Result<LogicalPlan> {
if show_measurements.on.is_some() {
// How do we handle this? Do we need to perform cross-namespace queries here?
return Err(DataFusionError::NotImplemented(
"SHOW MEASUREMENTS ON <database>".into(),
));
return error::not_implemented("SHOW MEASUREMENTS ON <database>");
}
if show_measurements.condition.is_some() {
return Err(DataFusionError::NotImplemented(
"SHOW MEASUREMENTS WHERE <condition>".into(),
));
return error::not_implemented("SHOW MEASUREMENTS WHERE <condition>");
}
let tables = match show_measurements.with_measurement {
@ -1544,17 +1566,13 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
WithMeasurementClause::Equals(qualified_name)
| WithMeasurementClause::Regex(qualified_name),
) if qualified_name.database.is_some() => {
return Err(DataFusionError::NotImplemented(
"database name in from clause".into(),
));
return error::not_implemented("database name in from clause");
}
Some(
WithMeasurementClause::Equals(qualified_name)
| WithMeasurementClause::Regex(qualified_name),
) if qualified_name.retention_policy.is_some() => {
return Err(DataFusionError::NotImplemented(
"retention policy in from clause".into(),
));
return error::not_implemented("retention policy in from clause");
}
Some(WithMeasurementClause::Equals(qualified_name)) => match qualified_name.name {
MeasurementName::Name(n) => {
@ -1566,16 +1584,12 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
}
}
MeasurementName::Regex(_) => {
return Err(DataFusionError::Plan(String::from(
"expected string but got regex",
)));
return error::query("expected string but got regex");
}
},
Some(WithMeasurementClause::Regex(qualified_name)) => match &qualified_name.name {
MeasurementName::Name(_) => {
return Err(DataFusionError::Plan(String::from(
"expected regex but got string",
)));
return error::query("expected regex but got string");
}
MeasurementName::Regex(regex) => {
let regex = parse_regex(regex)?;
@ -1674,9 +1688,7 @@ fn build_gap_fill_node(
// added by the planner.
let (stride, time_range, origin) = if date_bin_args.len() == 3 {
let time_col = date_bin_args[1].try_into_col().map_err(|_| {
DataFusionError::Internal(
"DATE_BIN requires a column as the source argument".to_string(),
)
error::map::internal("DATE_BIN requires a column as the source argument")
})?;
// Ensure that a time range was specified and is valid for gap filling
@ -1703,10 +1715,10 @@ fn build_gap_fill_node(
} else {
// This is an internal error as the date_bin function is added by the planner and should
// always contain the correct number of arguments.
return Err(DataFusionError::Internal(format!(
return error::internal(format!(
"DATE_BIN expects 3 arguments, got {}",
date_bin_args.len()
)));
));
};
let aggr = Aggregate::try_from_plan(&input)?;
@ -1747,7 +1759,7 @@ fn build_gap_fill_node(
fn plan_with_metadata(plan: LogicalPlan, metadata: &InfluxQlMetadata) -> Result<LogicalPlan> {
fn make_schema(schema: DFSchemaRef, metadata: &InfluxQlMetadata) -> Result<DFSchemaRef> {
let data = serde_json::to_string(metadata).map_err(|err| {
DataFusionError::Internal(format!("error serializing InfluxQL metadata: {err}"))
error::map::internal(format!("error serializing InfluxQL metadata: {err}"))
})?;
let mut md = schema.metadata().clone();
@ -1852,22 +1864,13 @@ fn plan_with_metadata(plan: LogicalPlan, metadata: &InfluxQlMetadata) -> Result<
t.projected_schema = make_schema(Arc::clone(&src.projected_schema), metadata)?;
LogicalPlan::TableScan(t)
}
_ => {
return Err(DataFusionError::External(
format!("unexpected LogicalPlan: {}", input.display()).into(),
))
}
_ => return error::internal(format!("unexpected LogicalPlan: {}", input.display())),
})
}
set_schema(&plan, metadata)
}
/// Returns `true` if any expressions refer to an aggregate function.
fn has_aggregate_exprs(fields: &FieldList) -> bool {
fields.iter().any(is_aggregate_field)
}
/// A utility function that checks whether `f` is an
/// aggregate field or not. An aggregate field is one that contains at least one
/// call to an aggregate function.
@ -1970,9 +1973,7 @@ fn conditional_op_to_operator(op: ConditionalOperator) -> Result<Operator> {
ConditionalOperator::And => Ok(Operator::And),
ConditionalOperator::Or => Ok(Operator::Or),
// NOTE: This is not supported by InfluxQL SELECT expressions, so it is unexpected
ConditionalOperator::In => Err(DataFusionError::Internal(
"unexpected binary operator: IN".into(),
)),
ConditionalOperator::In => error::internal("unexpected binary operator: IN"),
}
}
@ -2080,7 +2081,7 @@ fn is_time_field(cond: &ConditionalExpression) -> bool {
fn find_expr(cond: &ConditionalExpression) -> Result<&IQLExpr> {
cond.expr()
.ok_or_else(|| DataFusionError::Internal("incomplete conditional expression".into()))
.ok_or_else(|| error::map::internal("incomplete conditional expression"))
}
fn eval_with_key_clause<'a>(
@ -2123,7 +2124,7 @@ fn eval_with_key_clause<'a>(
#[cfg(test)]
mod test {
use super::*;
use crate::plan::test_utils::{parse_select, MockSchemaProvider};
use crate::plan::test_utils::MockSchemaProvider;
use influxdb_influxql_parser::parse_statements;
use insta::assert_snapshot;
use schema::SchemaBuilder;
@ -2263,6 +2264,84 @@ mod test {
mod select {
use super::*;
mod functions {
use super::*;
#[test]
fn test_selectors() {
// single-selector query
assert_snapshot!(plan("SELECT LAST(usage_idle) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, (selector_last(cpu.usage_idle,cpu.time))[time] AS time, (selector_last(cpu.usage_idle,cpu.time))[value] AS last [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N]
Aggregate: groupBy=[[]], aggr=[[selector_last(cpu.usage_idle, cpu.time)]] [selector_last(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
// single-selector, grouping by tags
assert_snapshot!(plan("SELECT LAST(usage_idle) FROM cpu GROUP BY cpu"), @r###"
Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, cpu:Dictionary(Int32, Utf8);N, last:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, (selector_last(cpu.usage_idle,cpu.time))[time] AS time, cpu.cpu AS cpu, (selector_last(cpu.usage_idle,cpu.time))[value] AS last [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, cpu:Dictionary(Int32, Utf8);N, last:Float64;N]
Aggregate: groupBy=[[cpu.cpu]], aggr=[[selector_last(cpu.usage_idle, cpu.time)]] [cpu:Dictionary(Int32, Utf8);N, selector_last(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
// aggregate query, as we're grouping by time
assert_snapshot!(plan("SELECT LAST(usage_idle) FROM cpu GROUP BY TIME(5s)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, selector_last_value(cpu.usage_idle,cpu.time) AS last [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N]
GapFill: groupBy=[[time]], aggr=[[selector_last_value(cpu.usage_idle,cpu.time)]], time_column=time, stride=IntervalMonthDayNano("5000000000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, selector_last_value(cpu.usage_idle,cpu.time):Float64;N]
Aggregate: groupBy=[[datebin(IntervalMonthDayNano("5000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[selector_last_value(cpu.usage_idle, cpu.time)]] [time:Timestamp(Nanosecond, None);N, selector_last_value(cpu.usage_idle,cpu.time):Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
// aggregate query, grouping by time with gap filling
assert_snapshot!(plan("SELECT FIRST(usage_idle) FROM cpu GROUP BY TIME(5s) FILL(0)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, first:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, coalesce(selector_first_value(cpu.usage_idle,cpu.time), Float64(0)) AS first [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, first:Float64;N]
GapFill: groupBy=[[time]], aggr=[[selector_first_value(cpu.usage_idle,cpu.time)]], time_column=time, stride=IntervalMonthDayNano("5000000000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, selector_first_value(cpu.usage_idle,cpu.time):Float64;N]
Aggregate: groupBy=[[datebin(IntervalMonthDayNano("5000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[selector_first_value(cpu.usage_idle, cpu.time)]] [time:Timestamp(Nanosecond, None);N, selector_first_value(cpu.usage_idle,cpu.time):Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
// aggregate query, as we're specifying multiple selectors or aggregates
assert_snapshot!(plan("SELECT LAST(usage_idle), FIRST(usage_idle) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), last:Float64;N, first:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, TimestampNanosecond(0, None) AS time, selector_last_value(cpu.usage_idle,cpu.time) AS last, selector_first_value(cpu.usage_idle,cpu.time) AS first [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), last:Float64;N, first:Float64;N]
Aggregate: groupBy=[[]], aggr=[[selector_last_value(cpu.usage_idle, cpu.time), selector_first_value(cpu.usage_idle, cpu.time)]] [selector_last_value(cpu.usage_idle,cpu.time):Float64;N, selector_first_value(cpu.usage_idle,cpu.time):Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
assert_snapshot!(plan("SELECT LAST(usage_idle), COUNT(usage_idle) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), last:Float64;N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, TimestampNanosecond(0, None) AS time, selector_last_value(cpu.usage_idle,cpu.time) AS last, COUNT(cpu.usage_idle) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), last:Float64;N, count:Int64;N]
Aggregate: groupBy=[[]], aggr=[[selector_last_value(cpu.usage_idle, cpu.time), COUNT(cpu.usage_idle)]] [selector_last_value(cpu.usage_idle,cpu.time):Float64;N, COUNT(cpu.usage_idle):Int64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
// not implemented
// See: https://github.com/influxdata/influxdb_iox/issues/7533
assert_snapshot!(plan("SELECT LAST(usage_idle), usage_system FROM cpu"), @"This feature is not implemented: projections with a single selector and fields: See https://github.com/influxdata/influxdb_iox/issues/7533");
// Validate we can call the remaining supported selector functions
assert_snapshot!(plan("SELECT FIRST(usage_idle) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, first:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, (selector_first(cpu.usage_idle,cpu.time))[time] AS time, (selector_first(cpu.usage_idle,cpu.time))[value] AS first [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, first:Float64;N]
Aggregate: groupBy=[[]], aggr=[[selector_first(cpu.usage_idle, cpu.time)]] [selector_first(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
assert_snapshot!(plan("SELECT MAX(usage_idle) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, max:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, (selector_max(cpu.usage_idle,cpu.time))[time] AS time, (selector_max(cpu.usage_idle,cpu.time))[value] AS max [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, max:Float64;N]
Aggregate: groupBy=[[]], aggr=[[selector_max(cpu.usage_idle, cpu.time)]] [selector_max(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
assert_snapshot!(plan("SELECT MIN(usage_idle) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, min:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, (selector_min(cpu.usage_idle,cpu.time))[time] AS time, (selector_min(cpu.usage_idle,cpu.time))[value] AS min [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, min:Float64;N]
Aggregate: groupBy=[[]], aggr=[[selector_min(cpu.usage_idle, cpu.time)]] [selector_min(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
}
}
/// Test InfluxQL-specific behaviour of scalar functions that differ
/// from DataFusion
#[test]
@ -3321,34 +3400,4 @@ mod test {
assert_snapshot!(plan("SELECT time, f64_field, i64_Field FROM data"));
}
}
#[test]
fn test_has_aggregate_exprs() {
let sel = parse_select("SELECT count(usage) FROM cpu");
assert!(has_aggregate_exprs(&sel.fields));
// Can be part of a complex expression
let sel = parse_select("SELECT sum(usage) + count(usage) FROM cpu");
assert!(has_aggregate_exprs(&sel.fields));
// Can be mixed with scalar columns
let sel = parse_select("SELECT idle, first(usage) FROM cpu");
assert!(has_aggregate_exprs(&sel.fields));
// Are case insensitive
let sel = parse_select("SELECT Count(usage) FROM cpu");
assert!(has_aggregate_exprs(&sel.fields));
// Returns false where it is not a valid aggregate function
let sel = parse_select("SELECT foo(usage) FROM cpu");
assert!(!has_aggregate_exprs(&sel.fields));
// Returns false when it is a math function
let sel = parse_select("SELECT abs(usage) FROM cpu");
assert!(!has_aggregate_exprs(&sel.fields));
// Returns false when there are only scalar functions
let sel = parse_select("SELECT usage, idle FROM cpu");
assert!(!has_aggregate_exprs(&sel.fields));
}
}

View File

@ -1,8 +1,9 @@
use crate::plan::error;
use crate::plan::timestamp::parse_timestamp;
use crate::plan::util::binary_operator_to_df_operator;
use datafusion::common::{DataFusionError, Result, ScalarValue};
use datafusion::logical_expr::{binary_expr, lit, now, BinaryExpr, Expr as DFExpr, Operator};
use influxdb_influxql_parser::expression::{Binary, BinaryOperator};
use influxdb_influxql_parser::expression::{Binary, BinaryOperator, Call};
use influxdb_influxql_parser::{expression::Expr, literal::Literal};
type ExprResult = Result<DFExpr>;
@ -62,11 +63,7 @@ pub(in crate::plan) fn time_range_to_df_expr(expr: &Expr, tz: Option<chrono_tz::
DFExpr::Literal(ScalarValue::Int64(Some(v))) => {
DFExpr::Literal(ScalarValue::TimestampNanosecond(Some(v), None))
}
_ => {
return Err(DataFusionError::Plan(
"invalid time range expression".into(),
))
}
_ => return error::query("invalid time range expression"),
})
}
@ -85,13 +82,13 @@ pub(super) fn duration_expr_to_nanoseconds(expr: &Expr) -> Result<i64> {
DFExpr::Literal(ScalarValue::IntervalMonthDayNano(Some(v))) => Ok(v as i64),
DFExpr::Literal(ScalarValue::Float64(Some(v))) => Ok(v as i64),
DFExpr::Literal(ScalarValue::Int64(Some(v))) => Ok(v),
_ => Err(DataFusionError::Plan("invalid duration expression".into())),
_ => error::query("invalid duration expression"),
}
}
fn map_expr_err(expr: &Expr) -> impl Fn(DataFusionError) -> DataFusionError + '_ {
move |err| {
DataFusionError::Plan(format!(
error::map::query(format!(
"invalid expression \"{}\": {}",
expr,
match err {
@ -105,11 +102,11 @@ fn map_expr_err(expr: &Expr) -> impl Fn(DataFusionError) -> DataFusionError + '_
fn reduce_expr(expr: &Expr, tz: Option<chrono_tz::Tz>) -> ExprResult {
match expr {
Expr::Binary(v) => reduce_binary_expr(v, tz).map_err(map_expr_err(expr)),
Expr::Call (v) => {
if !v.name.eq_ignore_ascii_case("now") {
return Err(DataFusionError::Plan(
format!("invalid function call '{}'", v.name),
));
Expr::Call (Call { name, .. }) => {
if !name.eq_ignore_ascii_case("now") {
return error::query(
format!("invalid function call '{name}'"),
);
}
Ok(now())
}
@ -123,14 +120,14 @@ fn reduce_expr(expr: &Expr, tz: Option<chrono_tz::Tz>) -> ExprResult {
None,
))),
Literal::Duration(v) => Ok(lit(ScalarValue::new_interval_mdn(0, 0, **v))),
_ => Err(DataFusionError::Plan(format!(
_ => error::query(format!(
"found literal '{val}', expected duration, float, integer, or timestamp string"
))),
)),
},
Expr::VarRef { .. } | Expr::BindParameter(_) | Expr::Wildcard(_) | Expr::Distinct(_) => Err(DataFusionError::Plan(format!(
Expr::VarRef { .. } | Expr::BindParameter(_) | Expr::Wildcard(_) | Expr::Distinct(_) => error::query(format!(
"found symbol '{expr}', expected now() or a literal duration, float, integer and timestamp string"
))),
)),
}
}
@ -190,9 +187,7 @@ fn reduce_binary_lhs_duration_df_expr(
BinaryOperator::Sub => {
Ok(lit(ScalarValue::new_interval_mdn(0, 0, (lhs - *d) as i64)))
}
_ => Err(DataFusionError::Plan(format!(
"found operator '{op}', expected +, -"
))),
_ => error::query(format!("found operator '{op}', expected +, -")),
},
// durations may only be scaled by float literals
ScalarValue::Float64(Some(v)) => {
@ -205,9 +200,7 @@ fn reduce_binary_lhs_duration_df_expr(
BinaryOperator::Div => {
Ok(lit(ScalarValue::new_interval_mdn(0, 0, lhs as i64 / *v)))
}
_ => Err(DataFusionError::Plan(format!(
"found operator '{op}', expected *, /"
))),
_ => error::query(format!("found operator '{op}', expected *, /")),
},
// A timestamp may be added to a duration
ScalarValue::TimestampNanosecond(Some(v), _) if matches!(op, BinaryOperator::Add) => {
@ -221,16 +214,16 @@ fn reduce_binary_lhs_duration_df_expr(
}
// This should not occur, as all the DataFusion literal values created by this process
// are handled above.
_ => Err(DataFusionError::Internal(format!(
_ => error::internal(format!(
"unexpected DataFusion literal '{rhs}' for duration expression"
))),
)),
},
DFExpr::ScalarFunction { .. } => reduce_binary_scalar_df_expr(
&expr_to_interval_df_expr(&lit(ScalarValue::new_interval_mdn(0, 0, lhs as i64)), tz)?,
op,
rhs,
),
_ => Err(DataFusionError::Plan("invalid duration expression".into())),
_ => error::query("invalid duration expression"),
}
}
@ -261,7 +254,7 @@ fn reduce_binary_lhs_integer_df_expr(
DFExpr::Literal(ScalarValue::Utf8(Some(s))) => {
reduce_binary_lhs_duration_df_expr(lhs.into(), op, &parse_timestamp_df_expr(s, tz)?, tz)
}
_ => Err(DataFusionError::Plan("invalid integer expression".into())),
_ => error::query("invalid integer expression"),
}
}
@ -272,17 +265,13 @@ fn reduce_binary_lhs_integer_df_expr(
/// ```
fn reduce_binary_lhs_float_df_expr(lhs: f64, op: BinaryOperator, rhs: &DFExpr) -> ExprResult {
Ok(lit(match rhs {
DFExpr::Literal(ScalarValue::Float64(Some(rhs))) => {
op.try_reduce(lhs, *rhs).ok_or_else(|| {
DataFusionError::Plan("invalid operator for float expression".to_string())
})?
}
DFExpr::Literal(ScalarValue::Int64(Some(rhs))) => {
op.try_reduce(lhs, *rhs).ok_or_else(|| {
DataFusionError::Plan("invalid operator for float expression".to_string())
})?
}
_ => return Err(DataFusionError::Plan("invalid float expression".into())),
DFExpr::Literal(ScalarValue::Float64(Some(rhs))) => op
.try_reduce(lhs, *rhs)
.ok_or_else(|| error::map::query("invalid operator for float expression"))?,
DFExpr::Literal(ScalarValue::Int64(Some(rhs))) => op
.try_reduce(lhs, *rhs)
.ok_or_else(|| error::map::query("invalid operator for float expression"))?,
_ => return error::query("invalid float expression"),
}))
}
@ -306,9 +295,9 @@ fn reduce_binary_lhs_timestamp_df_expr(
DFExpr::Literal(ScalarValue::IntervalMonthDayNano(Some(d))) => match op {
BinaryOperator::Add => Ok(lit(ScalarValue::TimestampNanosecond(Some(lhs + *d as i64), None))),
BinaryOperator::Sub => Ok(lit(ScalarValue::TimestampNanosecond(Some(lhs - *d as i64), None))),
_ => Err(DataFusionError::Plan(
_ => error::query(
format!("invalid operator '{op}' for timestamp and duration: expected +, -"),
)),
),
}
DFExpr::Literal(ScalarValue::Int64(_))
// NOTE: This is a slight deviation from InfluxQL, for which the only valid binary
@ -321,9 +310,9 @@ fn reduce_binary_lhs_timestamp_df_expr(
&expr_to_interval_df_expr(rhs, tz)?,
tz,
),
_ => Err(DataFusionError::Plan(
_ => error::query(
format!("invalid expression '{rhs}': expected duration, integer or timestamp string"),
)),
),
}
}
@ -335,9 +324,7 @@ fn reduce_binary_scalar_df_expr(lhs: &DFExpr, op: BinaryOperator, rhs: &DFExpr)
match op {
BinaryOperator::Add => Ok(binary_expr(lhs.clone(), Operator::Plus, rhs.clone())),
BinaryOperator::Sub => Ok(binary_expr(lhs.clone(), Operator::Minus, rhs.clone())),
_ => Err(DataFusionError::Plan(format!(
"found operator '{op}', expected +, -"
))),
_ => error::query(format!("found operator '{op}', expected +, -")),
}
}
@ -351,11 +338,7 @@ fn expr_to_interval_df_expr(expr: &DFExpr, tz: Option<chrono_tz::Tz>) -> ExprRes
DFExpr::Literal(ScalarValue::Int64(Some(v))) => *v,
DFExpr::Literal(ScalarValue::TimestampNanosecond(Some(v), _)) => *v,
DFExpr::Literal(ScalarValue::Utf8(Some(s))) => parse_timestamp_nanos(s, tz)?,
_ => {
return Err(DataFusionError::Plan(format!(
"unable to cast '{expr}' to duration"
)))
}
_ => return error::query(format!("unable to cast '{expr}' to duration")),
},
)))
}
@ -383,16 +366,16 @@ fn reduce_binary_lhs_string_df_expr(
| DFExpr::Literal(ScalarValue::Int64(_)) => {
reduce_binary_lhs_timestamp_df_expr(parse_timestamp_nanos(lhs, tz)?, op, rhs, tz)
}
_ => Err(DataFusionError::Plan(format!(
_ => error::query(format!(
"found '{rhs}', expected duration, integer or timestamp string"
))),
)),
}
}
fn parse_timestamp_nanos(s: &str, tz: Option<chrono_tz::Tz>) -> Result<i64> {
parse_timestamp(s, tz)
.map(|ts| ts.timestamp_nanos())
.map_err(|_| DataFusionError::Plan(format!("'{s}' is not a valid timestamp")))
.map_err(|_| error::map::query(format!("'{s}' is not a valid timestamp")))
}
/// Parse s as a timestamp in the specified timezone and return the timestamp
@ -593,7 +576,6 @@ mod test {
];
for (interval_str, expected_scalar) in cases {
println!("Parsing {interval_str}, expecting {expected_scalar:?}");
let parsed_interval = parse(interval_str).unwrap();
let DFExpr::Literal(actual_scalar) = parsed_interval else {
panic!("Expected literal Expr, got {parsed_interval:?}");

File diff suppressed because it is too large Load Diff

View File

@ -1,8 +1,8 @@
//! APIs for testing.
#![cfg(test)]
use crate::plan::SchemaProvider;
use datafusion::common::{DataFusionError, Result as DataFusionResult};
use crate::plan::{error, SchemaProvider};
use datafusion::common::Result as DataFusionResult;
use datafusion::datasource::empty::EmptyTable;
use datafusion::datasource::provider_as_source;
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, TableSource};
@ -156,7 +156,7 @@ impl SchemaProvider for MockSchemaProvider {
self.tables
.get(name)
.map(|(t, _)| Arc::clone(t))
.ok_or_else(|| DataFusionError::Plan(format!("measurement does not exist: {name}")))
.ok_or_else(|| error::map::query(format!("measurement does not exist: {name}")))
}
fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {

View File

@ -1,5 +1,6 @@
use crate::plan::error;
use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Offset, TimeZone};
use datafusion::common::{DataFusionError, Result};
use datafusion::common::Result;
/// Parse the timestamp string and return a DateTime in UTC.
fn parse_timestamp_utc(s: &str) -> Result<DateTime<FixedOffset>> {
@ -19,7 +20,7 @@ fn parse_timestamp_utc(s: &str) -> Result<DateTime<FixedOffset>> {
.map(|nd| nd.and_time(NaiveTime::default())),
)
.map(|ts| DateTime::from_utc(ts, chrono::Utc.fix()))
.map_err(|_| DataFusionError::Plan("invalid timestamp string".into()))
.map_err(|_| error::map::query("invalid timestamp string"))
}
/// Parse the timestamp string and return a DateTime in the specified timezone.
@ -50,7 +51,7 @@ fn parse_timestamp_tz(s: &str, tz: chrono_tz::Tz) -> Result<DateTime<FixedOffset
.ok_or(())
})
.map(|ts| ts.with_timezone(&ts.offset().fix()))
.map_err(|_| DataFusionError::Plan("invalid timestamp string".into()))
.map_err(|_| error::map::query("invalid timestamp string"))
}
/// Parse the string and return a `DateTime` using a fixed offset.

View File

@ -1,6 +1,6 @@
use crate::plan::util_copy;
use crate::plan::{error, util_copy};
use arrow::datatypes::DataType;
use datafusion::common::{DFSchema, DFSchemaRef, DataFusionError, Result};
use datafusion::common::{DFSchema, DFSchemaRef, Result};
use datafusion::logical_expr::utils::expr_as_column_expr;
use datafusion::logical_expr::{coalesce, lit, Expr, ExprSchemable, LogicalPlan, Operator};
use influxdb_influxql_parser::expression::BinaryOperator;
@ -27,7 +27,7 @@ pub(in crate::plan) fn binary_operator_to_df_operator(op: BinaryOperator) -> Ope
pub(in crate::plan) fn schema_from_df(schema: &DFSchema) -> Result<Schema> {
let s: Arc<arrow::datatypes::Schema> = Arc::new(schema.into());
s.try_into().map_err(|err| {
DataFusionError::Internal(format!(
error::map::internal(format!(
"unable to convert DataFusion schema to IOx schema: {err}"
))
})
@ -51,9 +51,8 @@ impl Schemas {
/// Sanitize an InfluxQL regular expression and create a compiled [`regex::Regex`].
pub(crate) fn parse_regex(re: &Regex) -> Result<regex::Regex> {
let pattern = clean_non_meta_escapes(re.as_str());
regex::Regex::new(&pattern).map_err(|e| {
DataFusionError::External(format!("invalid regular expression '{re}': {e}").into())
})
regex::Regex::new(&pattern)
.map_err(|e| error::map::query(format!("invalid regular expression '{re}': {e}")))
}
/// Returns `n` as a literal expression of the specified `data_type`.
@ -67,9 +66,7 @@ fn number_to_expr(n: &Number, data_type: DataType) -> Result<Expr> {
(Number::Float(v), DataType::UInt64) => lit(*v as u64),
(n, data_type) => {
// The only output data types expected are Int64, Float64 or UInt64
return Err(DataFusionError::Internal(format!(
"no conversion from {n} to {data_type}"
)));
return error::internal(format!("no conversion from {n} to {data_type}"));
}
})
}

View File

@ -7,7 +7,7 @@
//! If these APIs are stabilised and made public, they can be removed from IOx.
//!
//! NOTE
use datafusion::common::{DataFusionError, Result};
use datafusion::common::Result;
use datafusion::logical_expr::{
expr::{
AggregateFunction, Between, BinaryExpr, Case, Cast, Expr, GetIndexedField, GroupingSet,