From b9479a2b3c996af9066cff4ae460de33542d934a Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Sun, 9 Apr 2023 07:22:54 +1000 Subject: [PATCH] refactor: `limit` function supports multiple measurements and tag sets (#7464) * fix: Add sort operator after window aggregate operator Closes #7460 * fix: Refactor `LIMIT` and `OFFSET` implementation These changes should allow the `limit` function to be used generically with any plan following the same conventions. * chore: No need to reorder this * chore: Add documentation to the `limit` function --- .../query_tests2/cases/in/issue_6112.influxql | 4 +- .../cases/in/issue_6112.influxql.expected | 15 + influxdb_iox/tests/query_tests2/setups.rs | 10 + iox_query_influxql/src/plan/planner.rs | 472 +++++++++--------- iox_query_influxql/src/plan/planner/select.rs | 9 +- 5 files changed, 266 insertions(+), 244 deletions(-) diff --git a/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql b/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql index f0437f55cf..7d0935c1aa 100644 --- a/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql +++ b/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql @@ -389,6 +389,9 @@ SELECT usage_idle FROM cpu WHERE cpu = 'cpu-total' LIMIT 2; SELECT usage_idle FROM cpu WHERE cpu = 'cpu-total' ORDER BY DESC LIMIT 2; SELECT usage_idle FROM cpu WHERE cpu = 'cpu-total' LIMIT 2 OFFSET 1; SELECT usage_idle FROM cpu WHERE cpu = 'cpu-total' ORDER BY DESC LIMIT 2 OFFSET 1; +-- correct final ordering +-- See: https://github.com/influxdata/influxdb_iox/issues/7460 +SELECT f64 FROM m2 GROUP BY tag0 LIMIT 1; -- raw query, multiple measurements SELECT usage_idle, bytes_free FROM cpu, disk WHERE cpu = 'cpu-total' OR device = 'disk1s1' LIMIT 2; @@ -428,7 +431,6 @@ SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-1 -- aggregate, group by TIME and tag, multiple measurements SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu, device LIMIT 1; - -- -- Unimplemented cases -- diff --git a/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected b/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected index 19c4a7cb3f..2c237775a2 100644 --- a/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected +++ b/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected @@ -1080,6 +1080,21 @@ Error while planning query: Error during planning: mixing aggregate and non-aggr +------------------+----------------------+------------+ | cpu | 2022-10-31T02:00:00Z | 2.98 | +------------------+----------------------+------------+ +-- InfluxQL: SELECT f64 FROM m2 GROUP BY tag0 LIMIT 1; ++------------------+----------------------+-------+------+ +| iox::measurement | time | tag0 | f64 | ++------------------+----------------------+-------+------+ +| m2 | 2022-10-31T02:00:00Z | val00 | 0.98 | +| m2 | 2022-10-31T02:00:00Z | val01 | 2.98 | +| m2 | 2022-10-31T02:00:00Z | val02 | 1.98 | +| m2 | 2022-10-31T02:00:00Z | val03 | 4.98 | +| m2 | 2022-10-31T02:00:00Z | val04 | 9.98 | +| m2 | 2022-10-31T02:00:00Z | val05 | 3.98 | +| m2 | 2022-10-31T02:00:00Z | val07 | 8.98 | +| m2 | 2022-10-31T02:00:00Z | val08 | 7.98 | +| m2 | 2022-10-31T02:00:00Z | val09 | 5.98 | +| m2 | 2022-10-31T02:00:00Z | val10 | 6.98 | ++------------------+----------------------+-------+------+ -- InfluxQL: SELECT usage_idle, bytes_free FROM cpu, disk WHERE cpu = 'cpu-total' OR device = 'disk1s1' LIMIT 2; +------------------+----------------------+------------+------------+ | iox::measurement | time | usage_idle | bytes_free | diff --git a/influxdb_iox/tests/query_tests2/setups.rs b/influxdb_iox/tests/query_tests2/setups.rs index 1b6e05e398..71f79e33f3 100644 --- a/influxdb_iox/tests/query_tests2/setups.rs +++ b/influxdb_iox/tests/query_tests2/setups.rs @@ -1246,6 +1246,16 @@ pub static SETUPS: Lazy> = Lazy::new(|| { disk,host=host1,device=disk1s2 bytes_free=2239i,bytes_used=319833i 1667181610000000000 disk,host=host1,device=disk1s5 bytes_free=3234i,bytes_used=419838i 1667181600000000000 disk,host=host1,device=disk1s5 bytes_free=3239i,bytes_used=419833i 1667181610000000000 + m2,tag0=val00 f64=0.98 1667181600000000000 + m2,tag0=val02 f64=1.98 1667181600000000000 + m2,tag0=val01 f64=2.98 1667181600000000000 + m2,tag0=val05 f64=3.98 1667181600000000000 + m2,tag0=val03 f64=4.98 1667181600000000000 + m2,tag0=val09 f64=5.98 1667181600000000000 + m2,tag0=val10 f64=6.98 1667181600000000000 + m2,tag0=val08 f64=7.98 1667181600000000000 + m2,tag0=val07 f64=8.98 1667181600000000000 + m2,tag0=val04 f64=9.98 1667181600000000000 "# .to_string(), ), diff --git a/iox_query_influxql/src/plan/planner.rs b/iox_query_influxql/src/plan/planner.rs index f20e2bd76f..b1c9df06db 100644 --- a/iox_query_influxql/src/plan/planner.rs +++ b/iox_query_influxql/src/plan/planner.rs @@ -29,6 +29,7 @@ use datafusion::logical_expr::{ }; use datafusion_util::{lit_dict, AsExpr}; use generated_types::influxdata::iox::querier::v1::InfluxQlMetadata; +use influxdb_influxql_parser::common::{LimitClause, OffsetClause}; use influxdb_influxql_parser::explain::{ExplainOption, ExplainStatement}; use influxdb_influxql_parser::expression::walk::walk_expr; use influxdb_influxql_parser::expression::{ @@ -39,7 +40,7 @@ use influxdb_influxql_parser::select::{ FillClause, GroupByClause, SLimitClause, SOffsetClause, TimeZoneClause, }; use influxdb_influxql_parser::{ - common::{LimitClause, MeasurementName, OffsetClause, WhereClause}, + common::{MeasurementName, WhereClause}, expression::Expr as IQLExpr, identifier::Identifier, literal::Literal, @@ -57,6 +58,7 @@ use schema::{ }; use std::collections::{HashSet, VecDeque}; use std::fmt::Debug; +use std::iter; use std::ops::{Bound, ControlFlow, Deref, Range}; use std::str::FromStr; use std::sync::Arc; @@ -335,7 +337,6 @@ impl<'a> InfluxQLToLogicalPlan<'a> { select, &fields, &group_by_tag_set, - &projection_tag_set, )? { // Exclude any plans that produce no data, which is // consistent with InfluxQL. @@ -352,15 +353,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { // UNION the remaining plans let plan = plans.into_iter().try_fold(plan, |prev, (next, proj)| { - let next = self.project_select( - &ctx, - next, - proj, - select, - &fields, - &group_by_tag_set, - &projection_tag_set, - )?; + let next = self.project_select(&ctx, next, proj, select, &fields, &group_by_tag_set)?; if let LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row: false, .. @@ -386,20 +379,33 @@ impl<'a> InfluxQLToLogicalPlan<'a> { }, )?; - let plan = if let LogicalPlan::Union(_) = plan { - // If the result set is a Union and therefore produces multiple measurements, they must - // be sorted by measurement in ascending order. - plan_with_sort(plan, select, &group_by_tag_set, &projection_tag_set)? - } else { - plan - }; + // true if the input plan is the UNION, indicating + // the result set produces multiple tables or measurements. + let is_multiple_measurements = matches!(plan, LogicalPlan::Union(_)); + + let plan = plan_with_sort( + plan, + select.order_by.to_sort_expr(), + is_multiple_measurements, + &group_by_tag_set, + &projection_tag_set, + )?; + + let plan = self.limit( + plan, + select.offset, + select.limit, + select.order_by.to_sort_expr(), + is_multiple_measurements, + &group_by_tag_set, + &projection_tag_set, + )?; let plan = self.slimit(plan, select.series_offset, select.series_limit)?; Ok(plan) } - #[allow(clippy::too_many_arguments)] fn project_select( &self, ctx: &Context<'_>, @@ -408,7 +414,6 @@ impl<'a> InfluxQLToLogicalPlan<'a> { select: &SelectStatement, fields: &[Field], group_by_tag_set: &[&str], - projection_tag_set: &[&str], ) -> Result { let schemas = Schemas::new(input.schema())?; @@ -440,110 +445,11 @@ impl<'a> InfluxQLToLogicalPlan<'a> { let (plan, select_exprs_post_aggr) = self.select_aggregate(ctx, plan, fields, select_exprs, group_by_tag_set, &schemas)?; - // projection expressions includes the `iox::measurement` column - let proj_exprs = proj - .into_iter() - .chain(select_exprs_post_aggr.into_iter()) - .collect::>(); - // Wrap the plan in a `LogicalPlan::Projection` from the select expressions - let plan = project(plan, proj_exprs.clone())?; - - let plan = plan_with_sort(plan, select, group_by_tag_set, projection_tag_set)?; - - if select.offset.is_some() || select.limit.is_some() { - if group_by_tag_set.is_empty() { - // If the query is not grouping by tags, the - // DataFusion Limit operator is sufficient. - self.limit(plan, select.offset, select.limit) - } else { - // If the query includes a GROUP BY tag[, tag, ...], the LIMIT and OFFSET clauses - // are applied to each unique group. To accomplish this, construct a plan which uses - // the ROW_NUMBER windowing function. - - // The name of the ROW_NUMBER window expression - const IOX_ROW_ALIAS: &str = "iox::row"; - - // Construct a ROW_NUMBER window expression: - // - // ROW_NUMBER() OVER ( - // PARTITION BY [group_by_tag_set] - // ORDER BY time [ASC | DESC] - // ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - // ) AS iox::row - let window_func_exprs = vec![Expr::WindowFunction(WindowFunction { - fun: window_function::WindowFunction::BuiltInWindowFunction( - BuiltInWindowFunction::RowNumber, - ), - args: vec![], - partition_by: fields_to_exprs_no_nulls(plan.schema(), group_by_tag_set) - .collect::>(), - order_by: vec![select.order_by.to_sort_expr()], - window_frame: WindowFrame { - units: WindowFrameUnits::Rows, - start_bound: WindowFrameBound::Preceding(ScalarValue::Null), - end_bound: WindowFrameBound::CurrentRow, - }, - }) - .alias(IOX_ROW_ALIAS)]; - - let plan = LogicalPlanBuilder::from(plan) - .window(window_func_exprs)? - .build()?; - - // Replace any expressions that are not a column with a column referencing - // an output column from the aggregate schema. - let proj_exprs = proj_exprs - .iter() - .map(|expr| expr_as_column_expr(expr, &plan)) - .collect::>>()?; - - let limit = select - .limit - .map(|v| >::try_into(*v)) - .transpose() - .map_err(|_| DataFusionError::Plan("limit out of range".to_owned()))?; - let offset = select - .offset - .map(|v| >::try_into(*v)) - .transpose() - .map_err(|_| DataFusionError::Plan("offset out of range".to_owned()))?; - - // a reference to the ROW_NUMBER column. - let row_alias = IOX_ROW_ALIAS.as_expr(); - - let row_filter_expr = match (limit, offset) { - // WHERE "iox::row" BETWEEN OFFSET + 1 AND OFFSET + LIMIT - (Some(limit), Some(offset)) => { - let low = offset + 1; - let high = offset + limit; - - Expr::Between(Between { - expr: Box::new(row_alias), - negated: false, - low: Box::new(lit(low)), - high: Box::new(lit(high)), - }) - } - - // WHERE "iox::row" <= LIMIT - (Some(limit), None) => row_alias.lt_eq(lit(limit)), - - // WHERE "iox::row" > OFFSET - (None, Some(offset)) => row_alias.gt(lit(offset)), - (None, None) => unreachable!("limit and offset cannot not be None"), - }; - - LogicalPlanBuilder::from(plan) - // Filter by the LIMIT and OFFSET clause - .filter(row_filter_expr)? - // Project the output without the IOX_ROW_ALIAS column - .project(proj_exprs)? - .build() - } - } else { - Ok(plan) - } + project( + plan, + proj.into_iter().chain(select_exprs_post_aggr.into_iter()), + ) } fn select_aggregate( @@ -723,22 +629,138 @@ impl<'a> InfluxQLToLogicalPlan<'a> { Ok((plan, select_exprs_post_aggr)) } - /// Optionally wrap the input logical plan in a [`LogicalPlan::Limit`] node using the specified - /// `offset` and `limit`. + /// Generate a plan that partitions the input data into groups, first omitting a specified + /// number of rows, followed by restricting the quantity of rows within each group. + /// + /// ## Arguments + /// + /// - `input`: The plan to apply the limit to. + /// - `offset`: The number of input rows to skip. + /// - `limit`: The maximum number of rows to return in the output plan per group. + /// - `time_sort_expr`: An `Expr::Sort` referring to the `time` column of the input. + /// - `is_multiple_measurements`: `true` if the `input` produces multiple measurements, + /// and therefore the limit should be applied per measurement and any additional group tags. + /// - `group_by_tag_set`: Tag columns from the `input` plan that should be used to partition + /// the `input` plan and sort the `output` plan. + /// - `projection_tag_set`: Additional tag columns that should be used to sort the `output` + /// plan. + #[allow(clippy::too_many_arguments)] fn limit( &self, input: LogicalPlan, offset: Option, limit: Option, + time_sort_expr: Expr, + is_multiple_measurements: bool, + group_by_tag_set: &[&str], + projection_tag_set: &[&str], ) -> Result { if offset.is_none() && limit.is_none() { return Ok(input); } - let skip = offset.map_or(0, |v| *v as usize); - let fetch = limit.map(|v| *v as usize); + if group_by_tag_set.is_empty() && !is_multiple_measurements { + // If the query is not grouping by tags, and is a single measurement, the DataFusion + // Limit operator is sufficient. + let skip = offset.map_or(0, |v| *v as usize); + let fetch = limit.map(|v| *v as usize); - LogicalPlanBuilder::from(input).limit(skip, fetch)?.build() + LogicalPlanBuilder::from(input).limit(skip, fetch)?.build() + } else { + // If the query includes a GROUP BY tag[, tag, ...], the LIMIT and OFFSET clauses + // are applied to each unique group. To accomplish this, construct a plan which uses + // the ROW_NUMBER windowing function. + + // The name of the ROW_NUMBER window expression + const IOX_ROW_ALIAS: &str = "iox::row"; + + // Construct a ROW_NUMBER window expression: + // + // ROW_NUMBER() OVER ( + // PARTITION BY [iox::measurement, group_by_tag_set] + // ORDER BY time [ASC | DESC] + // ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + // ) AS iox::row + let window_func_exprs = vec![Expr::WindowFunction(WindowFunction { + fun: window_function::WindowFunction::BuiltInWindowFunction( + BuiltInWindowFunction::RowNumber, + ), + args: vec![], + partition_by: iter::once(INFLUXQL_MEASUREMENT_COLUMN_NAME.as_expr()) + .chain(fields_to_exprs_no_nulls(input.schema(), group_by_tag_set)) + .collect::>(), + order_by: vec![time_sort_expr.clone()], + window_frame: WindowFrame { + units: WindowFrameUnits::Rows, + start_bound: WindowFrameBound::Preceding(ScalarValue::Null), + end_bound: WindowFrameBound::CurrentRow, + }, + }) + .alias(IOX_ROW_ALIAS)]; + + // Prepare new projection. + let proj_exprs = input + .schema() + .fields() + .iter() + .map(|expr| Expr::Column(expr.unqualified_column())) + .collect::>(); + + let plan = LogicalPlanBuilder::from(input) + .window(window_func_exprs)? + .build()?; + + let limit = limit + .map(|v| >::try_into(*v)) + .transpose() + .map_err(|_| DataFusionError::Plan("limit out of range".to_owned()))?; + let offset = offset + .map(|v| >::try_into(*v)) + .transpose() + .map_err(|_| DataFusionError::Plan("offset out of range".to_owned()))?; + + // a reference to the ROW_NUMBER column. + let row_alias = IOX_ROW_ALIAS.as_expr(); + + let row_filter_expr = match (limit, offset) { + // WHERE "iox::row" BETWEEN OFFSET + 1 AND OFFSET + LIMIT + (Some(limit), Some(offset)) => { + let low = offset + 1; + let high = offset + limit; + + Expr::Between(Between { + expr: Box::new(row_alias), + negated: false, + low: Box::new(lit(low)), + high: Box::new(lit(high)), + }) + } + + // WHERE "iox::row" <= LIMIT + (Some(limit), None) => row_alias.lt_eq(lit(limit)), + + // WHERE "iox::row" > OFFSET + (None, Some(offset)) => row_alias.gt(lit(offset)), + (None, None) => unreachable!("limit and offset cannot not be None"), + }; + + let plan = LogicalPlanBuilder::from(plan) + // Filter by the LIMIT and OFFSET clause + .filter(row_filter_expr)? + // Project the output without the IOX_ROW_ALIAS column + .project(proj_exprs)? + .build()?; + + // For consistency with InfluxQL, the final results must be sorted by + // the tag set from the GROUP BY + plan_with_sort( + plan, + time_sort_expr, + is_multiple_measurements, + group_by_tag_set, + projection_tag_set, + ) + } } /// Verifies the `SLIMIT` and `SOFFSET` clauses are `None`; otherwise, return a @@ -1778,14 +1800,10 @@ mod test { assert_snapshot!(plan("SELECT host, cpu, device, usage_idle, bytes_used FROM cpu, disk"), @r###" Sort: iox::measurement ASC NULLS LAST, time ASC NULLS LAST, cpu ASC NULLS LAST, device ASC NULLS LAST, host ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_used:Int64;N] Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_used:Int64;N] - Projection: iox::measurement, time, host, CAST(cpu AS Utf8) AS cpu, CAST(device AS Utf8) AS device, usage_idle, CAST(bytes_used AS Int64) AS bytes_used [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_used:Int64;N] - Sort: time ASC NULLS LAST, cpu ASC NULLS LAST, host ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Dictionary(Int32, Utf8);N, device:Null;N, usage_idle:Float64;N, bytes_used:Null;N] - Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.host AS host, cpu.cpu AS cpu, NULL AS device, cpu.usage_idle AS usage_idle, NULL AS bytes_used [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Dictionary(Int32, Utf8);N, device:Null;N, usage_idle:Float64;N, bytes_used:Null;N] - TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] - Projection: iox::measurement, time, host, CAST(cpu AS Utf8) AS cpu, CAST(device AS Utf8) AS device, CAST(usage_idle AS Float64) AS usage_idle, bytes_used [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_used:Int64;N] - Sort: time ASC NULLS LAST, device ASC NULLS LAST, host ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Null;N, device:Dictionary(Int32, Utf8);N, usage_idle:Null;N, bytes_used:Int64;N] - Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, disk.host AS host, NULL AS cpu, disk.device AS device, NULL AS usage_idle, disk.bytes_used AS bytes_used [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Null;N, device:Dictionary(Int32, Utf8);N, usage_idle:Null;N, bytes_used:Int64;N] - TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.host AS host, CAST(cpu.cpu AS Utf8) AS cpu, CAST(NULL AS Utf8) AS device, cpu.usage_idle AS usage_idle, CAST(NULL AS Int64) AS bytes_used [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_used:Int64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, disk.host AS host, CAST(NULL AS Utf8) AS cpu, CAST(disk.device AS Utf8) AS device, CAST(NULL AS Float64) AS usage_idle, disk.bytes_used AS bytes_used [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_used:Int64;N] + TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] "###); // nonexistent @@ -1800,12 +1818,10 @@ mod test { assert_snapshot!(plan("SELECT host, usage_idle FROM cpu, cpu"), @r###" Sort: iox::measurement ASC NULLS LAST, time ASC NULLS LAST, host ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] - Sort: time ASC NULLS LAST, host ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] - Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.host AS host, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] - TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] - Sort: time ASC NULLS LAST, host ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] - Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.host AS host, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] - TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.host AS host, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.host AS host, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] "###); } @@ -2312,31 +2328,34 @@ mod test { #[test] fn with_limit_or_offset() { assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo LIMIT 1"), @r###" - Projection: iox::measurement, time, foo, count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] - Filter: iox::row <= Int64(1) [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N] - WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [foo] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N] - Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] - Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, data.foo AS foo, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] - Aggregate: groupBy=[[data.foo]], aggr=[[COUNT(data.f64_field)]] [foo:Dictionary(Int32, Utf8);N, COUNT(data.f64_field):Int64;N] - TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] - "###); + Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] + Projection: iox::measurement, time, foo, count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] + Filter: iox::row <= Int64(1) [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N] + WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [iox::measurement, foo] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N] + Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] + Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, data.foo AS foo, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] + Aggregate: groupBy=[[data.foo]], aggr=[[COUNT(data.f64_field)]] [foo:Dictionary(Int32, Utf8);N, COUNT(data.f64_field):Int64;N] + TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] + "###); assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo OFFSET 1"), @r###" - Projection: iox::measurement, time, foo, count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] - Filter: iox::row > Int64(1) [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N] - WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [foo] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N] - Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] - Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, data.foo AS foo, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] - Aggregate: groupBy=[[data.foo]], aggr=[[COUNT(data.f64_field)]] [foo:Dictionary(Int32, Utf8);N, COUNT(data.f64_field):Int64;N] - TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] - "###); + Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] + Projection: iox::measurement, time, foo, count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] + Filter: iox::row > Int64(1) [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N] + WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [iox::measurement, foo] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N] + Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] + Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, data.foo AS foo, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] + Aggregate: groupBy=[[data.foo]], aggr=[[COUNT(data.f64_field)]] [foo:Dictionary(Int32, Utf8);N, COUNT(data.f64_field):Int64;N] + TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] + "###); assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo LIMIT 2 OFFSET 3"), @r###" - Projection: iox::measurement, time, foo, count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] - Filter: iox::row BETWEEN Int64(4) AND Int64(5) [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N] - WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [foo] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N] - Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] - Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, data.foo AS foo, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] - Aggregate: groupBy=[[data.foo]], aggr=[[COUNT(data.f64_field)]] [foo:Dictionary(Int32, Utf8);N, COUNT(data.f64_field):Int64;N] - TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] + Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] + Projection: iox::measurement, time, foo, count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] + Filter: iox::row BETWEEN Int64(4) AND Int64(5) [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N] + WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [iox::measurement, foo] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N] + Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] + Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, data.foo AS foo, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] + Aggregate: groupBy=[[data.foo]], aggr=[[COUNT(data.f64_field)]] [foo:Dictionary(Int32, Utf8);N, COUNT(data.f64_field):Int64;N] + TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] "###); // Fallible @@ -2508,126 +2527,101 @@ mod test { assert_snapshot!(plan("SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY cpu"), @r###" Sort: iox::measurement ASC NULLS LAST, cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] - Projection: iox::measurement, time, CAST(cpu AS Utf8) AS cpu, usage_idle, CAST(bytes_free AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] - Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N] - Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle, NULL AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N] - TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] - Projection: iox::measurement, time, CAST(cpu AS Utf8) AS cpu, CAST(usage_idle AS Float64) AS usage_idle, bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] - Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N] - Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, NULL AS cpu, NULL AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N] - TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, CAST(cpu.cpu AS Utf8) AS cpu, cpu.usage_idle AS usage_idle, CAST(NULL AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, CAST(NULL AS Utf8) AS cpu, CAST(NULL AS Float64) AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] + TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] "###); // Sort should be iox::measurement, cpu, device, time assert_snapshot!(plan("SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY device, cpu"), @r###" Sort: iox::measurement ASC NULLS LAST, cpu ASC NULLS LAST, device ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] - Projection: iox::measurement, time, CAST(cpu AS Utf8) AS cpu, CAST(device AS Utf8) AS device, usage_idle, CAST(bytes_free AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] - Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, device:Null;N, usage_idle:Float64;N, bytes_free:Null;N] - Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, NULL AS device, cpu.usage_idle AS usage_idle, NULL AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, device:Null;N, usage_idle:Float64;N, bytes_free:Null;N] - TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] - Projection: iox::measurement, time, CAST(cpu AS Utf8) AS cpu, CAST(device AS Utf8) AS device, CAST(usage_idle AS Float64) AS usage_idle, bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] - Sort: device ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Null;N, device:Dictionary(Int32, Utf8);N, usage_idle:Null;N, bytes_free:Int64;N] - Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, NULL AS cpu, disk.device AS device, NULL AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Null;N, device:Dictionary(Int32, Utf8);N, usage_idle:Null;N, bytes_free:Int64;N] - TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, CAST(cpu.cpu AS Utf8) AS cpu, CAST(NULL AS Utf8) AS device, cpu.usage_idle AS usage_idle, CAST(NULL AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, CAST(NULL AS Utf8) AS cpu, CAST(disk.device AS Utf8) AS device, CAST(NULL AS Float64) AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] + TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] "###); // Sort should be iox::measurement, cpu, time, device assert_snapshot!(plan("SELECT device, usage_idle, bytes_free FROM cpu, disk GROUP BY cpu"), @r###" Sort: iox::measurement ASC NULLS LAST, cpu ASC NULLS LAST, time ASC NULLS LAST, device ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] - Projection: iox::measurement, time, CAST(cpu AS Utf8) AS cpu, CAST(device AS Utf8) AS device, usage_idle, CAST(bytes_free AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] - Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, device:Null;N, usage_idle:Float64;N, bytes_free:Null;N] - Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, NULL AS device, cpu.usage_idle AS usage_idle, NULL AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, device:Null;N, usage_idle:Float64;N, bytes_free:Null;N] - TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] - Projection: iox::measurement, time, CAST(cpu AS Utf8) AS cpu, CAST(device AS Utf8) AS device, CAST(usage_idle AS Float64) AS usage_idle, bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] - Sort: time ASC NULLS LAST, device ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Null;N, device:Dictionary(Int32, Utf8);N, usage_idle:Null;N, bytes_free:Int64;N] - Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, NULL AS cpu, disk.device AS device, NULL AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Null;N, device:Dictionary(Int32, Utf8);N, usage_idle:Null;N, bytes_free:Int64;N] - TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, CAST(cpu.cpu AS Utf8) AS cpu, CAST(NULL AS Utf8) AS device, cpu.usage_idle AS usage_idle, CAST(NULL AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, CAST(NULL AS Utf8) AS cpu, CAST(disk.device AS Utf8) AS device, CAST(NULL AS Float64) AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] + TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] "###); // Sort should be iox::measurement, cpu, device, time assert_snapshot!(plan("SELECT cpu, usage_idle, bytes_free FROM cpu, disk GROUP BY cpu, device"), @r###" Sort: iox::measurement ASC NULLS LAST, cpu ASC NULLS LAST, device ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] - Projection: iox::measurement, time, CAST(device AS Utf8) AS device, CAST(cpu AS Utf8) AS cpu, usage_idle, CAST(bytes_free AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] - Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Null;N, cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N] - Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, NULL AS device, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle, NULL AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Null;N, cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N] - TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] - Projection: iox::measurement, time, CAST(device AS Utf8) AS device, CAST(cpu AS Utf8) AS cpu, CAST(usage_idle AS Float64) AS usage_idle, bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] - Sort: device ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Dictionary(Int32, Utf8);N, cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N] - Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, disk.device AS device, NULL AS cpu, NULL AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Dictionary(Int32, Utf8);N, cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N] - TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, CAST(NULL AS Utf8) AS device, CAST(cpu.cpu AS Utf8) AS cpu, cpu.usage_idle AS usage_idle, CAST(NULL AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, CAST(disk.device AS Utf8) AS device, CAST(NULL AS Utf8) AS cpu, CAST(NULL AS Float64) AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] + TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] "###); // Sort should be iox::measurement, device, time, cpu assert_snapshot!(plan("SELECT cpu, usage_idle, bytes_free FROM cpu, disk GROUP BY device"), @r###" Sort: iox::measurement ASC NULLS LAST, device ASC NULLS LAST, time ASC NULLS LAST, cpu ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] - Projection: iox::measurement, time, CAST(device AS Utf8) AS device, CAST(cpu AS Utf8) AS cpu, usage_idle, CAST(bytes_free AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] - Sort: time ASC NULLS LAST, cpu ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Null;N, cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N] - Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, NULL AS device, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle, NULL AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Null;N, cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N] - TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] - Projection: iox::measurement, time, CAST(device AS Utf8) AS device, CAST(cpu AS Utf8) AS cpu, CAST(usage_idle AS Float64) AS usage_idle, bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] - Sort: device ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Dictionary(Int32, Utf8);N, cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N] - Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, disk.device AS device, NULL AS cpu, NULL AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Dictionary(Int32, Utf8);N, cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N] - TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, CAST(NULL AS Utf8) AS device, CAST(cpu.cpu AS Utf8) AS cpu, cpu.usage_idle AS usage_idle, CAST(NULL AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, CAST(disk.device AS Utf8) AS device, CAST(NULL AS Utf8) AS cpu, CAST(NULL AS Float64) AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] + TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] "###); // If a tag specified in a GROUP BY does not exist across all measurements, it should be omitted from the sort assert_snapshot!(plan("SELECT cpu, usage_idle, bytes_free FROM cpu, disk GROUP BY device, non_existent"), @r###" Sort: iox::measurement ASC NULLS LAST, device ASC NULLS LAST, time ASC NULLS LAST, cpu ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, non_existent:Null;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, non_existent:Null;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] - Projection: iox::measurement, time, CAST(device AS Utf8) AS device, non_existent, CAST(cpu AS Utf8) AS cpu, usage_idle, CAST(bytes_free AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, non_existent:Null;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] - Sort: time ASC NULLS LAST, cpu ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Null;N, non_existent:Null;N, cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N] - Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, NULL AS device, NULL AS non_existent, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle, NULL AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Null;N, non_existent:Null;N, cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N] - TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] - Projection: iox::measurement, time, CAST(device AS Utf8) AS device, non_existent, CAST(cpu AS Utf8) AS cpu, CAST(usage_idle AS Float64) AS usage_idle, bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, non_existent:Null;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] - Sort: device ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Dictionary(Int32, Utf8);N, non_existent:Null;N, cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N] - Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, disk.device AS device, NULL AS non_existent, NULL AS cpu, NULL AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Dictionary(Int32, Utf8);N, non_existent:Null;N, cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N] - TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, CAST(NULL AS Utf8) AS device, NULL AS non_existent, CAST(cpu.cpu AS Utf8) AS cpu, cpu.usage_idle AS usage_idle, CAST(NULL AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, non_existent:Null;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, CAST(disk.device AS Utf8) AS device, NULL AS non_existent, CAST(NULL AS Utf8) AS cpu, CAST(NULL AS Float64) AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, non_existent:Null;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N] + TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] "###); // If a tag specified in a projection does not exist across all measurements, it should be omitted from the sort assert_snapshot!(plan("SELECT cpu, usage_idle, bytes_free, non_existent FROM cpu, disk GROUP BY device"), @r###" Sort: iox::measurement ASC NULLS LAST, device ASC NULLS LAST, time ASC NULLS LAST, cpu ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N, non_existent:Null;N] Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N, non_existent:Null;N] - Projection: iox::measurement, time, CAST(device AS Utf8) AS device, CAST(cpu AS Utf8) AS cpu, usage_idle, CAST(bytes_free AS Int64) AS bytes_free, non_existent [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N, non_existent:Null;N] - Sort: time ASC NULLS LAST, cpu ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Null;N, cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N, non_existent:Null;N] - Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, NULL AS device, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle, NULL AS bytes_free, NULL AS non_existent [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Null;N, cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N, non_existent:Null;N] - TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] - Projection: iox::measurement, time, CAST(device AS Utf8) AS device, CAST(cpu AS Utf8) AS cpu, CAST(usage_idle AS Float64) AS usage_idle, bytes_free, non_existent [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N, non_existent:Null;N] - Sort: device ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Dictionary(Int32, Utf8);N, cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N, non_existent:Null;N] - Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, disk.device AS device, NULL AS cpu, NULL AS usage_idle, disk.bytes_free AS bytes_free, NULL AS non_existent [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Dictionary(Int32, Utf8);N, cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N, non_existent:Null;N] - TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, CAST(NULL AS Utf8) AS device, CAST(cpu.cpu AS Utf8) AS cpu, cpu.usage_idle AS usage_idle, CAST(NULL AS Int64) AS bytes_free, NULL AS non_existent [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N, non_existent:Null;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, CAST(disk.device AS Utf8) AS device, CAST(NULL AS Utf8) AS cpu, CAST(NULL AS Float64) AS usage_idle, disk.bytes_free AS bytes_free, NULL AS non_existent [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N, non_existent:Null;N] + TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] "###); } #[test] fn test_select_group_by_limit_offset() { assert_snapshot!(plan("SELECT usage_idle FROM cpu GROUP BY cpu LIMIT 1"), @r###" - Projection: iox::measurement, time, cpu, usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] - Filter: iox::row <= Int64(1) [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N] - WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [cpu] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N] - Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] - Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] - TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + Projection: iox::measurement, time, cpu, usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + Filter: iox::row <= Int64(1) [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N] + WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [iox::measurement, cpu] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N] + Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] "###); assert_snapshot!(plan("SELECT usage_idle FROM cpu GROUP BY cpu OFFSET 1"), @r###" - Projection: iox::measurement, time, cpu, usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] - Filter: iox::row > Int64(1) [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N] - WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [cpu] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N] - Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] - Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] - TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + Projection: iox::measurement, time, cpu, usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + Filter: iox::row > Int64(1) [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N] + WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [iox::measurement, cpu] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N] + Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] "###); assert_snapshot!(plan("SELECT usage_idle FROM cpu GROUP BY cpu LIMIT 1 OFFSET 1"), @r###" - Projection: iox::measurement, time, cpu, usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] - Filter: iox::row BETWEEN Int64(2) AND Int64(2) [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N] - WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [cpu] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N] - Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] - Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] - TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + Projection: iox::measurement, time, cpu, usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + Filter: iox::row BETWEEN Int64(2) AND Int64(2) [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N] + WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [iox::measurement, cpu] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N] + Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] "###); } diff --git a/iox_query_influxql/src/plan/planner/select.rs b/iox_query_influxql/src/plan/planner/select.rs index 5cd45bfa34..3e002353e3 100644 --- a/iox_query_influxql/src/plan/planner/select.rs +++ b/iox_query_influxql/src/plan/planner/select.rs @@ -6,7 +6,7 @@ use datafusion_util::AsExpr; use generated_types::influxdata::iox::querier::v1::influx_ql_metadata::TagKeyColumn; use influxdb_influxql_parser::common::OrderByClause; use influxdb_influxql_parser::expression::{Expr as IQLExpr, VarRef, VarRefDataType}; -use influxdb_influxql_parser::select::{Field, SelectStatement}; +use influxdb_influxql_parser::select::Field; use schema::INFLUXQL_MEASUREMENT_COLUMN_NAME; use std::collections::HashMap; use std::ops::Deref; @@ -74,7 +74,8 @@ pub(super) fn make_tag_key_column_meta( /// Sort expressions referring to tag keys are always specified in lexicographically ascending order. pub(super) fn plan_with_sort( plan: LogicalPlan, - select: &SelectStatement, + time_sort_expr: Expr, + sort_by_measurement: bool, group_by_tag_set: &[&str], projection_tag_set: &[&str], ) -> Result { @@ -82,7 +83,7 @@ pub(super) fn plan_with_sort( // NOTE: Ideally DataFusion would maintain the order of the UNION ALL, which would eliminate // the need to sort by measurement. // See: https://github.com/influxdata/influxdb_iox/issues/7062 - let mut series_sort = if matches!(plan, LogicalPlan::Union(_)) { + let mut series_sort = if sort_by_measurement { vec![Expr::sort( INFLUXQL_MEASUREMENT_COLUMN_NAME.as_expr(), true, @@ -116,7 +117,7 @@ pub(super) fn plan_with_sort( series_sort.extend(map_to_expr(schema, group_by_tag_set)); }; - series_sort.push(select.order_by.to_sort_expr()); + series_sort.push(time_sort_expr); series_sort.extend(map_to_expr(schema, projection_tag_set));