feat: add limited `last`, `first`, `min` and `max` selector functions
Returns a `NotImplemented` error when attempting to execute a selector query, which projects a single selector function and additional tags or fields until #7533 is implemented. Introduced `error` module to simplify error handling and ensure consistency of error messages.pull/24376/head
parent
03ea8ea2b8
commit
69d75745cc
|
@ -2950,6 +2950,7 @@ dependencies = [
|
|||
"serde",
|
||||
"serde_json",
|
||||
"test_helpers",
|
||||
"thiserror",
|
||||
"workspace-hack",
|
||||
]
|
||||
|
||||
|
|
|
@ -422,3 +422,33 @@ SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-1
|
|||
|
||||
-- aggregate, group by TIME and tag, multiple measurements
|
||||
SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu, device LIMIT 1;
|
||||
|
||||
--
|
||||
-- Selector functions
|
||||
--
|
||||
|
||||
-- Single selectors, which should produce a non-zero timestamp
|
||||
-- for the matching value of the selector function.
|
||||
SELECT FIRST(usage_idle) FROM cpu;
|
||||
SELECT LAST(usage_idle) FROM cpu;
|
||||
SELECT MAX(usage_idle) FROM cpu;
|
||||
SELECT MIN(usage_idle) FROM cpu;
|
||||
|
||||
-- Single selectors with a GROUP BY tag should produce non-zero
|
||||
-- timestamps for the values in each group
|
||||
SELECT FIRST(usage_idle) FROM cpu GROUP BY cpu;
|
||||
SELECT LAST(usage_idle) FROM cpu GROUP BY cpu;
|
||||
SELECT MAX(usage_idle) FROM cpu GROUP BY cpu;
|
||||
SELECT MIN(usage_idle) FROM cpu GROUP BY cpu;
|
||||
|
||||
-- Adding group by time means the timestamp
|
||||
-- should be the start of each window
|
||||
SELECT FIRST(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:00:30Z' GROUP BY TIME(10s);
|
||||
|
||||
-- Multiple selectors result in an aggregate query and therefore
|
||||
-- a zero timestamp value.
|
||||
SELECT FIRST(usage_idle), MAX(usage_idle) FROM cpu;
|
||||
|
||||
-- Mixing selectors and aggregates,
|
||||
-- timetamp should be start of each window
|
||||
SELECT FIRST(usage_idle), COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu, device LIMIT 1;
|
||||
|
|
|
@ -2191,3 +2191,157 @@ tags: cpu=, device=disk1s5
|
|||
+---------------------+-------+---------+
|
||||
| 2022-10-31T02:00:00 | | 2 |
|
||||
+---------------------+-------+---------+
|
||||
-- InfluxQL: SELECT FIRST(usage_idle) FROM cpu;
|
||||
name: cpu
|
||||
+---------------------+-------+
|
||||
| time | first |
|
||||
+---------------------+-------+
|
||||
| 2022-10-31T02:00:00 | 2.98 |
|
||||
+---------------------+-------+
|
||||
-- InfluxQL: SELECT LAST(usage_idle) FROM cpu;
|
||||
name: cpu
|
||||
+---------------------+------+
|
||||
| time | last |
|
||||
+---------------------+------+
|
||||
| 2022-10-31T02:00:10 | 2.99 |
|
||||
+---------------------+------+
|
||||
-- InfluxQL: SELECT MAX(usage_idle) FROM cpu;
|
||||
name: cpu
|
||||
+---------------------+------+
|
||||
| time | max |
|
||||
+---------------------+------+
|
||||
| 2022-10-31T02:00:10 | 2.99 |
|
||||
+---------------------+------+
|
||||
-- InfluxQL: SELECT MIN(usage_idle) FROM cpu;
|
||||
name: cpu
|
||||
+---------------------+------+
|
||||
| time | min |
|
||||
+---------------------+------+
|
||||
| 2022-10-31T02:00:00 | 0.98 |
|
||||
+---------------------+------+
|
||||
-- InfluxQL: SELECT FIRST(usage_idle) FROM cpu GROUP BY cpu;
|
||||
name: cpu
|
||||
tags: cpu=cpu-total
|
||||
+---------------------+-------+
|
||||
| time | first |
|
||||
+---------------------+-------+
|
||||
| 2022-10-31T02:00:00 | 2.98 |
|
||||
+---------------------+-------+
|
||||
name: cpu
|
||||
tags: cpu=cpu0
|
||||
+---------------------+-------+
|
||||
| time | first |
|
||||
+---------------------+-------+
|
||||
| 2022-10-31T02:00:00 | 0.98 |
|
||||
+---------------------+-------+
|
||||
name: cpu
|
||||
tags: cpu=cpu1
|
||||
+---------------------+-------+
|
||||
| time | first |
|
||||
+---------------------+-------+
|
||||
| 2022-10-31T02:00:00 | 1.98 |
|
||||
+---------------------+-------+
|
||||
-- InfluxQL: SELECT LAST(usage_idle) FROM cpu GROUP BY cpu;
|
||||
name: cpu
|
||||
tags: cpu=cpu-total
|
||||
+---------------------+------+
|
||||
| time | last |
|
||||
+---------------------+------+
|
||||
| 2022-10-31T02:00:10 | 2.99 |
|
||||
+---------------------+------+
|
||||
name: cpu
|
||||
tags: cpu=cpu0
|
||||
+---------------------+------+
|
||||
| time | last |
|
||||
+---------------------+------+
|
||||
| 2022-10-31T02:00:10 | 0.99 |
|
||||
+---------------------+------+
|
||||
name: cpu
|
||||
tags: cpu=cpu1
|
||||
+---------------------+------+
|
||||
| time | last |
|
||||
+---------------------+------+
|
||||
| 2022-10-31T02:00:10 | 1.99 |
|
||||
+---------------------+------+
|
||||
-- InfluxQL: SELECT MAX(usage_idle) FROM cpu GROUP BY cpu;
|
||||
name: cpu
|
||||
tags: cpu=cpu-total
|
||||
+---------------------+------+
|
||||
| time | max |
|
||||
+---------------------+------+
|
||||
| 2022-10-31T02:00:10 | 2.99 |
|
||||
+---------------------+------+
|
||||
name: cpu
|
||||
tags: cpu=cpu0
|
||||
+---------------------+------+
|
||||
| time | max |
|
||||
+---------------------+------+
|
||||
| 2022-10-31T02:00:10 | 0.99 |
|
||||
+---------------------+------+
|
||||
name: cpu
|
||||
tags: cpu=cpu1
|
||||
+---------------------+------+
|
||||
| time | max |
|
||||
+---------------------+------+
|
||||
| 2022-10-31T02:00:10 | 1.99 |
|
||||
+---------------------+------+
|
||||
-- InfluxQL: SELECT MIN(usage_idle) FROM cpu GROUP BY cpu;
|
||||
name: cpu
|
||||
tags: cpu=cpu-total
|
||||
+---------------------+------+
|
||||
| time | min |
|
||||
+---------------------+------+
|
||||
| 2022-10-31T02:00:00 | 2.98 |
|
||||
+---------------------+------+
|
||||
name: cpu
|
||||
tags: cpu=cpu0
|
||||
+---------------------+------+
|
||||
| time | min |
|
||||
+---------------------+------+
|
||||
| 2022-10-31T02:00:00 | 0.98 |
|
||||
+---------------------+------+
|
||||
name: cpu
|
||||
tags: cpu=cpu1
|
||||
+---------------------+------+
|
||||
| time | min |
|
||||
+---------------------+------+
|
||||
| 2022-10-31T02:00:00 | 1.98 |
|
||||
+---------------------+------+
|
||||
-- InfluxQL: SELECT FIRST(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:00:30Z' GROUP BY TIME(10s);
|
||||
name: cpu
|
||||
+---------------------+-------+
|
||||
| time | first |
|
||||
+---------------------+-------+
|
||||
| 2022-10-31T02:00:00 | 2.98 |
|
||||
| 2022-10-31T02:00:10 | 2.99 |
|
||||
| 2022-10-31T02:00:20 | |
|
||||
+---------------------+-------+
|
||||
-- InfluxQL: SELECT FIRST(usage_idle), MAX(usage_idle) FROM cpu;
|
||||
name: cpu
|
||||
+---------------------+-------+------+
|
||||
| time | first | max |
|
||||
+---------------------+-------+------+
|
||||
| 1970-01-01T00:00:00 | 2.98 | 2.99 |
|
||||
+---------------------+-------+------+
|
||||
-- InfluxQL: SELECT FIRST(usage_idle), COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu, device LIMIT 1;
|
||||
name: cpu
|
||||
tags: cpu=cpu-total, device=
|
||||
+---------------------+-------+-------+
|
||||
| time | first | count |
|
||||
+---------------------+-------+-------+
|
||||
| 2022-10-31T02:00:00 | 2.98 | 2 |
|
||||
+---------------------+-------+-------+
|
||||
name: cpu
|
||||
tags: cpu=cpu0, device=
|
||||
+---------------------+-------+-------+
|
||||
| time | first | count |
|
||||
+---------------------+-------+-------+
|
||||
| 2022-10-31T02:00:00 | 0.98 | 2 |
|
||||
+---------------------+-------+-------+
|
||||
name: cpu
|
||||
tags: cpu=cpu1, device=
|
||||
+---------------------+-------+-------+
|
||||
| time | first | count |
|
||||
+---------------------+-------+-------+
|
||||
| 2022-10-31T02:00:00 | 1.98 | 2 |
|
||||
+---------------------+-------+-------+
|
|
@ -22,6 +22,7 @@ query_functions = { path = "../query_functions"}
|
|||
regex = "1"
|
||||
schema = { path = "../schema" }
|
||||
serde_json = "1.0.96"
|
||||
thiserror = "1.0"
|
||||
workspace-hack = { version = "0.1", path = "../workspace-hack" }
|
||||
|
||||
[dev-dependencies] # In alphabetical order
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
use datafusion::common::Result;
|
||||
|
||||
/// An error that was the result of an invalid InfluxQL query.
|
||||
pub(crate) fn query<T>(s: impl Into<String>) -> Result<T> {
|
||||
Err(map::query(s))
|
||||
}
|
||||
|
||||
/// An unexpected error whilst planning that represents a bug in IOx.
|
||||
pub(crate) fn internal<T>(s: impl Into<String>) -> Result<T> {
|
||||
Err(map::internal(s))
|
||||
}
|
||||
|
||||
/// The specified `feature` is not implemented.
|
||||
pub(crate) fn not_implemented<T>(feature: impl Into<String>) -> Result<T> {
|
||||
Err(map::not_implemented(feature))
|
||||
}
|
||||
|
||||
/// Functions that return a DataFusionError rather than a `Result<T, DataFusionError>`
|
||||
/// making them convenient to use with functions like `map_err`.
|
||||
pub(crate) mod map {
|
||||
use datafusion::common::DataFusionError;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
enum PlannerError {
|
||||
/// An unexpected error that represents a bug in IOx.
|
||||
#[error("internal: {0}")]
|
||||
Internal(String),
|
||||
}
|
||||
|
||||
/// An error that was the result of an invalid InfluxQL query.
|
||||
pub(crate) fn query(s: impl Into<String>) -> DataFusionError {
|
||||
DataFusionError::Plan(s.into())
|
||||
}
|
||||
|
||||
/// An unexpected error whilst planning that represents a bug in IOx.
|
||||
pub(crate) fn internal(s: impl Into<String>) -> DataFusionError {
|
||||
DataFusionError::External(Box::new(PlannerError::Internal(s.into())))
|
||||
}
|
||||
|
||||
/// The specified `feature` is not implemented.
|
||||
pub(crate) fn not_implemented(feature: impl Into<String>) -> DataFusionError {
|
||||
DataFusionError::NotImplemented(feature.into())
|
||||
}
|
||||
}
|
|
@ -2,6 +2,7 @@ mod expr_type_evaluator;
|
|||
mod field;
|
||||
mod field_mapper;
|
||||
mod planner;
|
||||
mod error;
|
||||
mod planner_rewrite_expression;
|
||||
mod planner_time_range_expression;
|
||||
mod rewriter;
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
mod select;
|
||||
|
||||
use crate::plan::error;
|
||||
use crate::plan::planner::select::{
|
||||
check_exprs_satisfy_columns, fields_to_exprs_no_nulls, make_tag_key_column_meta,
|
||||
plan_with_sort, ToSortExpr,
|
||||
|
@ -8,7 +9,9 @@ use crate::plan::planner_rewrite_expression::{rewrite_conditional, rewrite_expr}
|
|||
use crate::plan::planner_time_range_expression::{
|
||||
duration_expr_to_nanoseconds, expr_to_df_interval_dt, time_range_to_df_expr,
|
||||
};
|
||||
use crate::plan::rewriter::rewrite_statement;
|
||||
use crate::plan::rewriter::{
|
||||
rewrite_statement, select_statement_info, ProjectionType, SelectStatementInfo,
|
||||
};
|
||||
use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, Schemas};
|
||||
use crate::plan::var_ref::{column_type_to_var_ref_data_type, var_ref_data_type_to_data_type};
|
||||
use arrow::array::StringBuilder;
|
||||
|
@ -56,6 +59,7 @@ use iox_query::exec::gapfill::{FillStrategy, GapFill, GapFillParams};
|
|||
use iox_query::logical_optimizer::range_predicate::find_time_range;
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::Lazy;
|
||||
use query_functions::selectors::{struct_selector_first, struct_selector_max, struct_selector_min};
|
||||
use query_functions::{clean_non_meta_escapes, selectors::struct_selector_last};
|
||||
use schema::{
|
||||
InfluxColumnType, InfluxFieldType, Schema, INFLUXQL_MEASUREMENT_COLUMN_NAME,
|
||||
|
@ -122,8 +126,7 @@ struct Context<'a> {
|
|||
scope: ExprScope,
|
||||
tz: Option<Tz>,
|
||||
|
||||
/// `true` if the query projection specifies aggregate expressions.
|
||||
is_aggregate: bool,
|
||||
info: SelectStatementInfo,
|
||||
|
||||
// GROUP BY information
|
||||
group_by: Option<&'a GroupByClause>,
|
||||
|
@ -131,8 +134,11 @@ struct Context<'a> {
|
|||
}
|
||||
|
||||
impl<'a> Context<'a> {
|
||||
fn new() -> Self {
|
||||
Default::default()
|
||||
fn new(info: SelectStatementInfo) -> Self {
|
||||
Self {
|
||||
info,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn with_scope(&self, scope: ExprScope) -> Self {
|
||||
|
@ -152,16 +158,16 @@ impl<'a> Context<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
fn with_is_aggregate(&self, is_aggregate: bool) -> Self {
|
||||
Self {
|
||||
is_aggregate,
|
||||
..*self
|
||||
}
|
||||
}
|
||||
|
||||
fn fill(&self) -> FillClause {
|
||||
self.fill.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn is_aggregate(&self) -> bool {
|
||||
matches!(
|
||||
self.info.projection_type,
|
||||
ProjectionType::Aggregate | ProjectionType::Selector { .. }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(missing_debug_implementations)]
|
||||
|
@ -177,33 +183,20 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
|
||||
pub fn statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
|
||||
match statement {
|
||||
Statement::CreateDatabase(_) => {
|
||||
Err(DataFusionError::NotImplemented("CREATE DATABASE".into()))
|
||||
}
|
||||
Statement::Delete(_) => Err(DataFusionError::NotImplemented("DELETE".into())),
|
||||
Statement::DropMeasurement(_) => {
|
||||
Err(DataFusionError::NotImplemented("DROP MEASUREMENT".into()))
|
||||
}
|
||||
Statement::CreateDatabase(_) => error::not_implemented("CREATE DATABASE"),
|
||||
Statement::Delete(_) => error::not_implemented("DELETE"),
|
||||
Statement::DropMeasurement(_) => error::not_implemented("DROP MEASUREMENT"),
|
||||
Statement::Explain(explain) => self.explain_statement_to_plan(*explain),
|
||||
Statement::Select(select) => self.select_statement_to_plan(
|
||||
&Context::new(),
|
||||
&self.rewrite_select_statement(*select)?,
|
||||
),
|
||||
Statement::ShowDatabases(_) => {
|
||||
Err(DataFusionError::NotImplemented("SHOW DATABASES".into()))
|
||||
Statement::Select(select) => {
|
||||
self.select_statement_to_plan(&self.rewrite_select_statement(*select)?)
|
||||
}
|
||||
Statement::ShowMeasurements(_) => {
|
||||
Err(DataFusionError::NotImplemented("SHOW MEASUREMENTS".into()))
|
||||
}
|
||||
Statement::ShowRetentionPolicies(_) => Err(DataFusionError::NotImplemented(
|
||||
"SHOW RETENTION POLICIES".into(),
|
||||
)),
|
||||
Statement::ShowTagKeys(_) => {
|
||||
Err(DataFusionError::NotImplemented("SHOW TAG KEYS".into()))
|
||||
}
|
||||
Statement::ShowTagValues(_) => {
|
||||
Err(DataFusionError::NotImplemented("SHOW TAG VALUES".into()))
|
||||
Statement::ShowDatabases(_) => error::not_implemented("SHOW DATABASES"),
|
||||
Statement::ShowMeasurements(_) => error::not_implemented("SHOW MEASUREMENTS"),
|
||||
Statement::ShowRetentionPolicies(_) => {
|
||||
error::not_implemented("SHOW RETENTION POLICIES")
|
||||
}
|
||||
Statement::ShowTagKeys(_) => error::not_implemented("SHOW TAG KEYS"),
|
||||
Statement::ShowTagValues(_) => error::not_implemented("SHOW TAG VALUES"),
|
||||
Statement::ShowFieldKeys(show_field_keys) => {
|
||||
self.show_field_keys_to_plan(*show_field_keys)
|
||||
}
|
||||
|
@ -211,10 +204,8 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
}
|
||||
|
||||
fn explain_statement_to_plan(&self, explain: ExplainStatement) -> Result<LogicalPlan> {
|
||||
let plan = self.select_statement_to_plan(
|
||||
&Context::new(),
|
||||
&self.rewrite_select_statement(*explain.select)?,
|
||||
)?;
|
||||
let plan =
|
||||
self.select_statement_to_plan(&self.rewrite_select_statement(*explain.select)?)?;
|
||||
let plan = Arc::new(plan);
|
||||
let schema = LogicalPlan::explain_schema();
|
||||
let schema = schema.to_dfschema_ref()?;
|
||||
|
@ -265,21 +256,12 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
}
|
||||
|
||||
/// Create a [`LogicalPlan`] from the specified InfluxQL `SELECT` statement.
|
||||
fn select_statement_to_plan(
|
||||
&self,
|
||||
ctx: &Context<'_>,
|
||||
select: &SelectStatement,
|
||||
) -> Result<LogicalPlan> {
|
||||
fn select_statement_to_plan(&self, select: &SelectStatement) -> Result<LogicalPlan> {
|
||||
let mut plans = self.plan_from_tables(&select.from)?;
|
||||
|
||||
let ctx = ctx
|
||||
let ctx = Context::new(select_statement_info(select)?)
|
||||
.with_timezone(select.timezone)
|
||||
.with_group_by_fill(select)
|
||||
.with_is_aggregate(
|
||||
has_aggregate_exprs(&select.fields)
|
||||
|| (select.group_by.is_some()
|
||||
&& select.group_by.as_ref().unwrap().time_dimension().is_some()),
|
||||
);
|
||||
.with_group_by_fill(select);
|
||||
|
||||
// The `time` column is always present in the result set
|
||||
let mut fields = if find_time_column_index(&select.fields).is_none() {
|
||||
|
@ -402,8 +384,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
},
|
||||
)?;
|
||||
|
||||
// true if the input plan is the UNION, indicating
|
||||
// the result set produces multiple tables or measurements.
|
||||
// The UNION operator indicates the result set produces multiple tables or measurements.
|
||||
let is_multiple_measurements = matches!(plan, LogicalPlan::Union(_));
|
||||
|
||||
let plan = plan_with_sort(
|
||||
|
@ -483,47 +464,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
group_by_tag_set: &[&str],
|
||||
schemas: &Schemas,
|
||||
) -> Result<(LogicalPlan, Vec<Expr>)> {
|
||||
if !ctx.is_aggregate {
|
||||
if !ctx.is_aggregate() {
|
||||
return Ok((input, select_exprs));
|
||||
}
|
||||
|
||||
// This section identifies the time column index and updates the time expression
|
||||
// based on the semantics of the projection.
|
||||
let time_column_index = {
|
||||
let Some(time_column_index) = find_time_column_index(fields) else {
|
||||
return Err(DataFusionError::Internal("unable to find time column".to_owned()))
|
||||
};
|
||||
|
||||
// Take ownership of the alias, so we don't reallocate, and temporarily place a literal
|
||||
// `NULL` in its place.
|
||||
let Expr::Alias(_, alias) = std::mem::replace(&mut select_exprs[time_column_index], lit(ScalarValue::Null)) else {
|
||||
return Err(DataFusionError::External("internal: time column is not an alias".into()))
|
||||
};
|
||||
|
||||
// Determine whether the query is projecting the time column or binning the time and
|
||||
// rewrite the time column expression.
|
||||
select_exprs[time_column_index] =
|
||||
if let Some(dim) = ctx.group_by.and_then(|gb| gb.time_dimension()) {
|
||||
let stride = expr_to_df_interval_dt(&dim.interval)?;
|
||||
let offset = if let Some(offset) = &dim.offset {
|
||||
duration_expr_to_nanoseconds(offset)?
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
date_bin(
|
||||
stride,
|
||||
"time".as_expr(),
|
||||
lit(ScalarValue::TimestampNanosecond(Some(offset), None)),
|
||||
)
|
||||
} else {
|
||||
lit_timestamp_nano(0)
|
||||
}
|
||||
.alias(alias);
|
||||
|
||||
time_column_index
|
||||
};
|
||||
|
||||
// Find a list of unique aggregate expressions from the projection.
|
||||
//
|
||||
// For example, a projection such as:
|
||||
|
@ -541,6 +485,61 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
// table.
|
||||
let aggr_exprs = find_aggregate_exprs(&select_exprs);
|
||||
|
||||
// This block identifies the time column index and updates the time expression
|
||||
// based on the semantics of the projection.
|
||||
let time_column_index = {
|
||||
let Some(time_column_index) = find_time_column_index(fields) else {
|
||||
return error::internal("unable to find time column")
|
||||
};
|
||||
|
||||
// Take ownership of the alias, so we don't reallocate, and temporarily place a literal
|
||||
// `NULL` in its place.
|
||||
let Expr::Alias(_, alias) = std::mem::replace(&mut select_exprs[time_column_index], lit(ScalarValue::Null)) else {
|
||||
return error::internal("time column is not an alias")
|
||||
};
|
||||
|
||||
// Determine whether the query is projecting the time column or binning the time and
|
||||
// rewrite the time column expression.
|
||||
select_exprs[time_column_index] = if let Some(dim) = ctx.group_by.and_then(|gb| gb.time_dimension()) {
|
||||
let stride = expr_to_df_interval_dt(&dim.interval)?;
|
||||
let offset = if let Some(offset) = &dim.offset {
|
||||
duration_expr_to_nanoseconds(offset)?
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
date_bin(
|
||||
stride,
|
||||
"time".as_expr(),
|
||||
lit(ScalarValue::TimestampNanosecond(Some(offset), None)),
|
||||
)
|
||||
} else if let ProjectionType::Selector { has_fields } =
|
||||
ctx.info.projection_type
|
||||
{
|
||||
if has_fields {
|
||||
return error::not_implemented("projections with a single selector and fields: See https://github.com/influxdata/influxdb_iox/issues/7533");
|
||||
}
|
||||
|
||||
let selector = match aggr_exprs.len() {
|
||||
1 => aggr_exprs[0].clone(),
|
||||
len => {
|
||||
// Should have been validated by `select_statement_info`
|
||||
return error::internal(format!("internal: expected 1 selector expression, got {len}"));
|
||||
}
|
||||
};
|
||||
|
||||
Expr::GetIndexedField(GetIndexedField {
|
||||
expr: Box::new(selector),
|
||||
key: ScalarValue::Utf8(Some("time".to_owned())),
|
||||
})
|
||||
} else {
|
||||
lit_timestamp_nano(0)
|
||||
}
|
||||
.alias(alias);
|
||||
|
||||
time_column_index
|
||||
};
|
||||
|
||||
let aggr_group_by_exprs = if let Some(group_by) = ctx.group_by {
|
||||
let mut group_by_exprs = Vec::new();
|
||||
|
||||
|
@ -601,9 +600,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
_ => {
|
||||
// The InfluxQL planner adds the `date_bin` function,
|
||||
// so this condition represents an internal failure.
|
||||
return Err(DataFusionError::Internal(
|
||||
"expected DATE_BIN function".to_owned(),
|
||||
));
|
||||
return error::internal("expected DATE_BIN function");
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -1107,6 +1104,16 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
))),
|
||||
}
|
||||
}
|
||||
"first" => {
|
||||
let expr = self.expr_to_df_expr(ctx, &args[0], schemas)?;
|
||||
match &expr {
|
||||
Expr::Literal(ScalarValue::Null) => Ok(expr),
|
||||
_ => Ok(Expr::GetIndexedField(GetIndexedField {
|
||||
expr: Box::new(struct_selector_first().call(vec![expr, "time".as_expr()])),
|
||||
key: ScalarValue::Utf8(Some("value".to_owned())),
|
||||
})),
|
||||
}
|
||||
}
|
||||
"last" => {
|
||||
let expr = self.expr_to_df_expr(ctx, &args[0], schemas)?;
|
||||
match &expr {
|
||||
|
@ -1117,7 +1124,27 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
})),
|
||||
}
|
||||
}
|
||||
_ => Err(DataFusionError::Plan(format!("Invalid function '{name}'"))),
|
||||
"max" => {
|
||||
let expr = self.expr_to_df_expr(ctx, &args[0], schemas)?;
|
||||
match &expr {
|
||||
Expr::Literal(ScalarValue::Null) => Ok(expr),
|
||||
_ => Ok(Expr::GetIndexedField(GetIndexedField {
|
||||
expr: Box::new(struct_selector_max().call(vec![expr, "time".as_expr()])),
|
||||
key: ScalarValue::Utf8(Some("value".to_owned())),
|
||||
})),
|
||||
}
|
||||
}
|
||||
"min" => {
|
||||
let expr = self.expr_to_df_expr(ctx, &args[0], schemas)?;
|
||||
match &expr {
|
||||
Expr::Literal(ScalarValue::Null) => Ok(expr),
|
||||
_ => Ok(Expr::GetIndexedField(GetIndexedField {
|
||||
expr: Box::new(struct_selector_min().call(vec![expr, "time".as_expr()])),
|
||||
key: ScalarValue::Utf8(Some("value".to_owned())),
|
||||
})),
|
||||
}
|
||||
}
|
||||
_ => error::query(format!("Invalid function '{name}'")),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1576,11 +1603,6 @@ fn plan_with_metadata(plan: LogicalPlan, metadata: &InfluxQlMetadata) -> Result<
|
|||
set_schema(&plan, metadata)
|
||||
}
|
||||
|
||||
/// Returns `true` if any expressions refer to an aggregate function.
|
||||
fn has_aggregate_exprs(fields: &FieldList) -> bool {
|
||||
fields.iter().any(is_aggregate_field)
|
||||
}
|
||||
|
||||
/// A utility function that checks whether `f` is an
|
||||
/// aggregate field or not. An aggregate field is one that contains at least one
|
||||
/// call to an aggregate function.
|
||||
|
@ -1799,7 +1821,7 @@ fn find_expr(cond: &ConditionalExpression) -> Result<&IQLExpr> {
|
|||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::plan::test_utils::{parse_select, MockSchemaProvider};
|
||||
use crate::plan::test_utils::MockSchemaProvider;
|
||||
use influxdb_influxql_parser::parse_statements;
|
||||
use insta::assert_snapshot;
|
||||
use schema::SchemaBuilder;
|
||||
|
@ -2953,34 +2975,4 @@ mod test {
|
|||
assert_snapshot!(plan("SELECT time, f64_field, i64_Field FROM data"));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_has_aggregate_exprs() {
|
||||
let sel = parse_select("SELECT count(usage) FROM cpu");
|
||||
assert!(has_aggregate_exprs(&sel.fields));
|
||||
|
||||
// Can be part of a complex expression
|
||||
let sel = parse_select("SELECT sum(usage) + count(usage) FROM cpu");
|
||||
assert!(has_aggregate_exprs(&sel.fields));
|
||||
|
||||
// Can be mixed with scalar columns
|
||||
let sel = parse_select("SELECT idle, first(usage) FROM cpu");
|
||||
assert!(has_aggregate_exprs(&sel.fields));
|
||||
|
||||
// Are case insensitive
|
||||
let sel = parse_select("SELECT Count(usage) FROM cpu");
|
||||
assert!(has_aggregate_exprs(&sel.fields));
|
||||
|
||||
// Returns false where it is not a valid aggregate function
|
||||
let sel = parse_select("SELECT foo(usage) FROM cpu");
|
||||
assert!(!has_aggregate_exprs(&sel.fields));
|
||||
|
||||
// Returns false when it is a math function
|
||||
let sel = parse_select("SELECT abs(usage) FROM cpu");
|
||||
assert!(!has_aggregate_exprs(&sel.fields));
|
||||
|
||||
// Returns false when there are only scalar functions
|
||||
let sel = parse_select("SELECT usage, idle FROM cpu");
|
||||
assert!(!has_aggregate_exprs(&sel.fields));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1196,9 +1196,10 @@ impl FieldChecker {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
|
||||
#[derive(Default, Debug, Copy, Clone, Eq, PartialEq)]
|
||||
pub(crate) enum ProjectionType {
|
||||
/// A query that projects no aggregate or selector functions.
|
||||
#[default]
|
||||
Raw,
|
||||
/// A query that projects one or more aggregate functions or
|
||||
/// two or more selector functions.
|
||||
|
@ -1213,8 +1214,11 @@ pub(crate) enum ProjectionType {
|
|||
TopBottomSelector,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Holds high-level information as the result of analysing
|
||||
/// a `SELECT` query.
|
||||
#[derive(Default, Debug, Copy, Clone)]
|
||||
pub(crate) struct SelectStatementInfo {
|
||||
/// Identifies the projection type for the `SELECT` query.
|
||||
pub projection_type: ProjectionType,
|
||||
}
|
||||
|
||||
|
@ -1227,8 +1231,6 @@ pub(crate) struct SelectStatementInfo {
|
|||
///
|
||||
/// * All aggregate, selector and window-like functions, such as `sum`, `last` or `difference`,
|
||||
/// specify a field expression as their first argument
|
||||
/// * Wildcard and regular expression expansion is not valid for binary expressions, such as
|
||||
/// `SELECT COUNT(*) + SUM(*) FROM cpu`
|
||||
/// * All projected columns must refer to a field or tag ensuring there are no literal
|
||||
/// projections such as `SELECT 1`
|
||||
/// * Argument types and values are valid
|
||||
|
|
Loading…
Reference in New Issue