chore: Extract interval duration and offset from `TIME`

pull/24376/head
Stuart Carnie 2023-06-08 12:17:45 +10:00
parent b14d244a5d
commit 7bd2a7bfdb
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
5 changed files with 128 additions and 44 deletions

View File

@ -390,8 +390,7 @@ impl TimeRange {
} }
/// Simplifies an InfluxQL duration `expr` to a nanosecond interval represented as an `i64`. /// Simplifies an InfluxQL duration `expr` to a nanosecond interval represented as an `i64`.
pub fn duration_expr_to_nanoseconds(expr: &Expr) -> Result<i64, ExprError> { pub fn duration_expr_to_nanoseconds(ctx: &ReduceContext, expr: &Expr) -> Result<i64, ExprError> {
let ctx = ReduceContext::default();
match reduce_expr(&ctx, expr)? { match reduce_expr(&ctx, expr)? {
Expr::Literal(Literal::Duration(v)) => Ok(*v), Expr::Literal(Literal::Duration(v)) => Ok(*v),
Expr::Literal(Literal::Float(v)) => Ok(v as i64), Expr::Literal(Literal::Float(v)) => Ok(v as i64),
@ -1020,7 +1019,7 @@ mod test {
.expr() .expr()
.unwrap() .unwrap()
.clone(); .clone();
duration_expr_to_nanoseconds(&expr) duration_expr_to_nanoseconds(&ReduceContext::default(), &expr)
} }
let cases = vec![ let cases = vec![

View File

@ -33,6 +33,18 @@ pub(super) struct Select {
/// The projection type of the selection. /// The projection type of the selection.
pub(super) projection_type: ProjectionType, pub(super) projection_type: ProjectionType,
/// The interval derived from the arguments to the `TIME` function
/// when a `GROUP BY` clause is declared with `TIME`.
pub(super) interval: Option<Interval>,
/// The number of additional intervals that must be read
/// for queries that group by time and use window functions such as
/// `DIFFERENCE` or `DERIVATIVE`. This ensures data for the first
/// window is available.
///
/// See: <https://github.com/influxdata/influxdb/blob/f365bb7e3a9c5e227dbf66d84adf674d3d127176/query/compile.go#L50>
pub(super) extra_intervals: usize,
/// Projection clause of the selection. /// Projection clause of the selection.
pub(super) fields: Vec<Field>, pub(super) fields: Vec<Field>,
@ -194,3 +206,15 @@ impl Display for Field {
write!(f, " AS {}", self.name) write!(f, " AS {}", self.name)
} }
} }
/// Represents the interval duration and offset
/// derived from the `TIME` function when specified
/// in a `GROUP BY` clause.
#[derive(Debug, Clone, Copy)]
pub(super) struct Interval {
/// The nanosecond duration of the interval
pub duration: i64,
/// The nanosecond offset of the interval.
pub offset: Option<i64>,
}

View File

@ -1,10 +1,10 @@
mod select; mod select;
use crate::plan::ir::{DataSource, Field, Select, SelectQuery}; use crate::plan::ir::{DataSource, Field, Interval, Select, SelectQuery};
use crate::plan::planner::select::{ use crate::plan::planner::select::{
fields_to_exprs_no_nulls, make_tag_key_column_meta, plan_with_sort, ProjectionInfo, fields_to_exprs_no_nulls, make_tag_key_column_meta, plan_with_sort, ProjectionInfo,
}; };
use crate::plan::planner_time_range_expression::{expr_to_df_interval_dt, time_range_to_df_expr}; use crate::plan::planner_time_range_expression::time_range_to_df_expr;
use crate::plan::rewriter::{find_table_names, rewrite_statement, ProjectionType}; use crate::plan::rewriter::{find_table_names, rewrite_statement, ProjectionType};
use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, Schemas}; use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, Schemas};
use crate::plan::var_ref::var_ref_data_type_to_data_type; use crate::plan::var_ref::var_ref_data_type_to_data_type;
@ -50,9 +50,7 @@ use influxdb_influxql_parser::show_measurements::{
use influxdb_influxql_parser::show_tag_keys::ShowTagKeysStatement; use influxdb_influxql_parser::show_tag_keys::ShowTagKeysStatement;
use influxdb_influxql_parser::show_tag_values::{ShowTagValuesStatement, WithKeyClause}; use influxdb_influxql_parser::show_tag_values::{ShowTagValuesStatement, WithKeyClause};
use influxdb_influxql_parser::simple_from_clause::ShowFromClause; use influxdb_influxql_parser::simple_from_clause::ShowFromClause;
use influxdb_influxql_parser::time_range::{ use influxdb_influxql_parser::time_range::{split_cond, ReduceContext, TimeRange};
duration_expr_to_nanoseconds, split_cond, ReduceContext, TimeRange,
};
use influxdb_influxql_parser::timestamp::Timestamp; use influxdb_influxql_parser::timestamp::Timestamp;
use influxdb_influxql_parser::{ use influxdb_influxql_parser::{
common::{MeasurementName, WhereClause}, common::{MeasurementName, WhereClause},
@ -145,6 +143,8 @@ struct Context<'a> {
// GROUP BY information // GROUP BY information
group_by: Option<&'a GroupByClause>, group_by: Option<&'a GroupByClause>,
fill: Option<FillClause>, fill: Option<FillClause>,
/// Interval of the `TIME` function
interval: Option<Interval>,
/// The set of tags specified in the top-level `SELECT` statement /// The set of tags specified in the top-level `SELECT` statement
/// which represent the tag set used for grouping output. /// which represent the tag set used for grouping output.
@ -165,6 +165,7 @@ impl<'a> Context<'a> {
time_range: select.time_range, time_range: select.time_range,
group_by: select.group_by.as_ref(), group_by: select.group_by.as_ref(),
fill: select.fill, fill: select.fill,
interval: select.interval,
root_group_by_tags, root_group_by_tags,
} }
} }
@ -183,6 +184,7 @@ impl<'a> Context<'a> {
time_range: select.time_range.intersected(self.time_range), time_range: select.time_range.intersected(self.time_range),
group_by: select.group_by.as_ref(), group_by: select.group_by.as_ref(),
fill: select.fill, fill: select.fill,
interval: select.interval,
root_group_by_tags: self.root_group_by_tags, root_group_by_tags: self.root_group_by_tags,
} }
} }
@ -638,13 +640,9 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
// 1. is binning by time, project the column using the `DATE_BIN` function, // 1. is binning by time, project the column using the `DATE_BIN` function,
// 2. is a single-selector query, project the `time` field of the selector aggregate, // 2. is a single-selector query, project the `time` field of the selector aggregate,
// 3. otherwise, project the Unix epoch (0) // 3. otherwise, project the Unix epoch (0)
select_exprs[time_column_index] = if let Some(dim) = ctx.group_by.and_then(|gb| gb.time_dimension()) { select_exprs[time_column_index] = if let Some(i) = ctx.interval {
let stride = expr_to_df_interval_dt(&dim.interval)?; let stride = lit(ScalarValue::new_interval_mdn(0, 0, i.duration));
let offset = if let Some(offset) = &dim.offset { let offset = i.offset.map_or(0, |v|v);
duration_expr_to_nanoseconds(offset).map_err(error::map::expr_error)?
} else {
0
};
date_bin( date_bin(
stride, stride,

View File

@ -1,20 +1,8 @@
//! APIs for transforming InfluxQL [expressions][influxdb_influxql_parser::expression::Expr]. //! APIs for transforming InfluxQL [expressions][influxdb_influxql_parser::expression::Expr].
use crate::plan::error; use datafusion::common::ScalarValue;
use datafusion::common::{Result, ScalarValue};
use datafusion::logical_expr::{lit, Expr as DFExpr}; use datafusion::logical_expr::{lit, Expr as DFExpr};
use datafusion_util::AsExpr; use datafusion_util::AsExpr;
use influxdb_influxql_parser::expression::Expr; use influxdb_influxql_parser::time_range::TimeRange;
use influxdb_influxql_parser::time_range::{duration_expr_to_nanoseconds, TimeRange};
type ExprResult = Result<DFExpr>;
/// Simplifies `expr` to an InfluxQL duration and returns a DataFusion interval.
///
/// Returns an error if `expr` is not a duration expression.
pub(super) fn expr_to_df_interval_dt(expr: &Expr) -> ExprResult {
let ns = duration_expr_to_nanoseconds(expr).map_err(error::map::expr_error)?;
Ok(lit(ScalarValue::new_interval_mdn(0, 0, ns)))
}
fn lower_bound_to_df_expr(v: Option<i64>) -> Option<DFExpr> { fn lower_bound_to_df_expr(v: Option<i64>) -> Option<DFExpr> {
v.map(|ts| { v.map(|ts| {

View File

@ -1,7 +1,7 @@
use crate::plan::expr_type_evaluator::TypeEvaluator; use crate::plan::expr_type_evaluator::TypeEvaluator;
use crate::plan::field::{field_by_name, field_name}; use crate::plan::field::{field_by_name, field_name};
use crate::plan::field_mapper::{field_and_dimensions, FieldTypeMap}; use crate::plan::field_mapper::{field_and_dimensions, FieldTypeMap};
use crate::plan::ir::{DataSource, Field, Select, SelectQuery, TagSet}; use crate::plan::ir::{DataSource, Field, Interval, Select, SelectQuery, TagSet};
use crate::plan::var_ref::{influx_type_to_var_ref_data_type, var_ref_data_type_to_influx_type}; use crate::plan::var_ref::{influx_type_to_var_ref_data_type, var_ref_data_type_to_influx_type};
use crate::plan::{error, util, SchemaProvider}; use crate::plan::{error, util, SchemaProvider};
use datafusion::common::{DataFusionError, Result}; use datafusion::common::{DataFusionError, Result};
@ -19,7 +19,9 @@ use influxdb_influxql_parser::select::{
Dimension, FillClause, FromMeasurementClause, GroupByClause, MeasurementSelection, Dimension, FillClause, FromMeasurementClause, GroupByClause, MeasurementSelection,
SelectStatement, SelectStatement,
}; };
use influxdb_influxql_parser::time_range::{split_cond, ReduceContext, TimeRange}; use influxdb_influxql_parser::time_range::{
duration_expr_to_nanoseconds, split_cond, ReduceContext, TimeRange,
};
use influxdb_influxql_parser::timestamp::Timestamp; use influxdb_influxql_parser::timestamp::Timestamp;
use itertools::Itertools; use itertools::Itertools;
use schema::InfluxColumnType; use schema::InfluxColumnType;
@ -100,21 +102,24 @@ impl RewriteSelect {
let (fields, group_by) = self.expand_projection(s, stmt, &from, &tag_set)?; let (fields, group_by) = self.expand_projection(s, stmt, &from, &tag_set)?;
let condition = self.condition_resolve_types(s, stmt, &from)?; let condition = self.condition_resolve_types(s, stmt, &from)?;
let rc = ReduceContext {
now: Some(Timestamp::from(
s.execution_props().query_execution_start_time,
)),
tz: stmt.timezone.map(|tz| *tz),
};
let interval = self.find_interval_offset(&rc, group_by.as_ref())?;
let (condition, time_range) = match condition { let (condition, time_range) = match condition {
Some(where_clause) => { Some(where_clause) => split_cond(&rc, &where_clause).map_err(error::map::expr_error)?,
let rc = ReduceContext {
now: Some(Timestamp::from(
s.execution_props().query_execution_start_time,
)),
tz: stmt.timezone.map(|tz| *tz),
};
split_cond(&rc, &where_clause).map_err(error::map::expr_error)?
}
None => (None, TimeRange::default()), None => (None, TimeRange::default()),
}; };
let SelectStatementInfo { projection_type } = let SelectStatementInfo {
select_statement_info(&fields, &group_by, stmt.fill)?; projection_type,
extra_intervals,
} = select_statement_info(&fields, &group_by, stmt.fill)?;
// Following InfluxQL OG behaviour, if this is a subquery, and the fill strategy equates // Following InfluxQL OG behaviour, if this is a subquery, and the fill strategy equates
// to `FILL(null)`, switch to `FILL(none)`. // to `FILL(null)`, switch to `FILL(none)`.
@ -131,6 +136,8 @@ impl RewriteSelect {
Ok(Select { Ok(Select {
projection_type, projection_type,
interval,
extra_intervals,
fields, fields,
from, from,
condition, condition,
@ -388,6 +395,29 @@ impl RewriteSelect {
Ok(Some(where_clause)) Ok(Some(where_clause))
} }
} }
/// Return the interval value of the `GROUP BY` clause if it specifies a `TIME`.
fn find_interval_offset(
&self,
ctx: &ReduceContext,
group_by: Option<&GroupByClause>,
) -> Result<Option<Interval>> {
Ok(
if let Some(td) = group_by.and_then(|v| v.time_dimension()) {
let duration = duration_expr_to_nanoseconds(ctx, &td.interval)
.map_err(error::map::expr_error)?;
let offset = td
.offset
.as_ref()
.map(|o| duration_expr_to_nanoseconds(ctx, o))
.transpose()
.map_err(error::map::expr_error)?;
Some(Interval { duration, offset })
} else {
None
},
)
}
} }
/// Ensures the `time` column is presented consistently across all `SELECT` queries. /// Ensures the `time` column is presented consistently across all `SELECT` queries.
@ -865,12 +895,30 @@ macro_rules! lit_string {
}; };
} }
/// Set the `extra_intervals` field of [`FieldChecker`] if it is
/// less than then proposed new value.
macro_rules! set_extra_intervals {
($SELF:expr, $NEW:expr) => {
if $SELF.extra_intervals < $NEW as usize {
$SELF.extra_intervals = $NEW as usize
}
};
}
/// Checks a number of expectations for the fields of a [`SelectStatement`]. /// Checks a number of expectations for the fields of a [`SelectStatement`].
#[derive(Default)] #[derive(Default)]
struct FieldChecker { struct FieldChecker {
/// `true` if the statement contains a `GROUP BY TIME` clause. /// `true` if the statement contains a `GROUP BY TIME` clause.
has_group_by_time: bool, has_group_by_time: bool,
/// The number of additional intervals that must be read
/// for queries that group by time and use window functions such as
/// `DIFFERENCE` or `DERIVATIVE`. This ensures data for the first
/// window is available.
///
/// See: <https://github.com/influxdata/influxdb/blob/f365bb7e3a9c5e227dbf66d84adf674d3d127176/query/compile.go#L50>
extra_intervals: usize,
/// `true` if the interval was inherited by a parent. /// `true` if the interval was inherited by a parent.
/// If this is set, then an interval that was inherited will not cause /// If this is set, then an interval that was inherited will not cause
/// a query that shouldn't have an interval to fail. /// a query that shouldn't have an interval to fail.
@ -1198,6 +1246,9 @@ impl FieldChecker {
self.inc_aggregate_count(); self.inc_aggregate_count();
check_exp_args!(name, 1, 2, args); check_exp_args!(name, 1, 2, args);
set_extra_intervals!(self, 1);
match args.get(1) { match args.get(1) {
Some(Expr::Literal(Literal::Duration(d))) if **d <= 0 => { Some(Expr::Literal(Literal::Duration(d))) if **d <= 0 => {
return error::query(format!("duration argument must be positive, got {d}")) return error::query(format!("duration argument must be positive, got {d}"))
@ -1217,6 +1268,8 @@ impl FieldChecker {
self.inc_aggregate_count(); self.inc_aggregate_count();
check_exp_args!(name, 1, 2, args); check_exp_args!(name, 1, 2, args);
set_extra_intervals!(self, 1);
match args.get(1) { match args.get(1) {
Some(Expr::Literal(Literal::Duration(d))) if **d <= 0 => { Some(Expr::Literal(Literal::Duration(d))) if **d <= 0 => {
return error::query(format!("duration argument must be positive, got {d}")) return error::query(format!("duration argument must be positive, got {d}"))
@ -1236,6 +1289,8 @@ impl FieldChecker {
self.inc_aggregate_count(); self.inc_aggregate_count();
check_exp_args!(name, 1, args); check_exp_args!(name, 1, args);
set_extra_intervals!(self, 1);
self.check_nested_symbol(name, &args[0]) self.check_nested_symbol(name, &args[0])
} }
@ -1243,6 +1298,8 @@ impl FieldChecker {
self.inc_aggregate_count(); self.inc_aggregate_count();
check_exp_args!("cumulative_sum", 1, args); check_exp_args!("cumulative_sum", 1, args);
set_extra_intervals!(self, 1);
self.check_nested_symbol("cumulative_sum", &args[0]) self.check_nested_symbol("cumulative_sum", &args[0])
} }
@ -1257,6 +1314,8 @@ impl FieldChecker {
)); ));
} }
set_extra_intervals!(self, v);
self.check_nested_symbol("moving_average", &args[0]) self.check_nested_symbol("moving_average", &args[0])
} }
@ -1269,6 +1328,8 @@ impl FieldChecker {
return error::query(format!("{name} period must be greater than 1, got {v}")); return error::query(format!("{name} period must be greater than 1, got {v}"));
} }
set_extra_intervals!(self, v);
if let Some(v) = lit_integer!(name, args, 2?) { if let Some(v) = lit_integer!(name, args, 2?) {
match (v, name) { match (v, name) {
(v, "triple_exponential_derivative") if v < 1 && v != -1 => { (v, "triple_exponential_derivative") if v < 1 && v != -1 => {
@ -1307,6 +1368,8 @@ impl FieldChecker {
return error::query(format!("{name} period must be greater than 1, got {v}")); return error::query(format!("{name} period must be greater than 1, got {v}"));
} }
set_extra_intervals!(self, v);
if let Some(v) = lit_integer!(name, args, 2?) { if let Some(v) = lit_integer!(name, args, 2?) {
if v < 0 && v != -1 { if v < 0 && v != -1 {
return error::query(format!( return error::query(format!(
@ -1327,6 +1390,8 @@ impl FieldChecker {
return error::query(format!("{name} period must be greater than 1, got {v}")); return error::query(format!("{name} period must be greater than 1, got {v}"));
} }
set_extra_intervals!(self, v);
if let Some(v) = lit_integer!(name, args, 2?) { if let Some(v) = lit_integer!(name, args, 2?) {
if v < 0 && v != -1 { if v < 0 && v != -1 {
return error::query(format!( return error::query(format!(
@ -1468,7 +1533,11 @@ pub(crate) enum ProjectionType {
#[derive(Default, Debug, Copy, Clone)] #[derive(Default, Debug, Copy, Clone)]
struct SelectStatementInfo { struct SelectStatementInfo {
/// Identifies the projection type for the `SELECT` query. /// Identifies the projection type for the `SELECT` query.
pub projection_type: ProjectionType, projection_type: ProjectionType,
/// Copied from [extra_intervals](FieldChecker::extra_intervals)
///
/// [See also](Select::extra_intervals).
extra_intervals: usize,
} }
/// Gather information about the semantics of a [`SelectStatement`] and verify /// Gather information about the semantics of a [`SelectStatement`] and verify
@ -1518,8 +1587,14 @@ fn select_statement_info(
}; };
let projection_type = fc.check_fields(fields, fill)?; let projection_type = fc.check_fields(fields, fill)?;
let FieldChecker {
extra_intervals, ..
} = fc;
Ok(SelectStatementInfo { projection_type }) Ok(SelectStatementInfo {
projection_type,
extra_intervals,
})
} }
#[cfg(test)] #[cfg(test)]