refactor: `limit` function supports multiple measurements and tag sets (#7464)

* fix: Add sort operator after window aggregate operator

Closes #7460

* fix: Refactor `LIMIT` and `OFFSET` implementation

These changes should allow the `limit` function to be used
generically with any plan following the same conventions.

* chore: No need to reorder this

* chore: Add documentation to the `limit` function
pull/24376/head
Stuart Carnie 2023-04-09 07:22:54 +10:00 committed by GitHub
parent 250b7a0cd6
commit b9479a2b3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 266 additions and 244 deletions

View File

@ -389,6 +389,9 @@ SELECT usage_idle FROM cpu WHERE cpu = 'cpu-total' LIMIT 2;
SELECT usage_idle FROM cpu WHERE cpu = 'cpu-total' ORDER BY DESC LIMIT 2;
SELECT usage_idle FROM cpu WHERE cpu = 'cpu-total' LIMIT 2 OFFSET 1;
SELECT usage_idle FROM cpu WHERE cpu = 'cpu-total' ORDER BY DESC LIMIT 2 OFFSET 1;
-- correct final ordering
-- See: https://github.com/influxdata/influxdb_iox/issues/7460
SELECT f64 FROM m2 GROUP BY tag0 LIMIT 1;
-- raw query, multiple measurements
SELECT usage_idle, bytes_free FROM cpu, disk WHERE cpu = 'cpu-total' OR device = 'disk1s1' LIMIT 2;
@ -428,7 +431,6 @@ SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-1
-- aggregate, group by TIME and tag, multiple measurements
SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu, device LIMIT 1;
--
-- Unimplemented cases
--

View File

@ -1080,6 +1080,21 @@ Error while planning query: Error during planning: mixing aggregate and non-aggr
+------------------+----------------------+------------+
| cpu | 2022-10-31T02:00:00Z | 2.98 |
+------------------+----------------------+------------+
-- InfluxQL: SELECT f64 FROM m2 GROUP BY tag0 LIMIT 1;
+------------------+----------------------+-------+------+
| iox::measurement | time | tag0 | f64 |
+------------------+----------------------+-------+------+
| m2 | 2022-10-31T02:00:00Z | val00 | 0.98 |
| m2 | 2022-10-31T02:00:00Z | val01 | 2.98 |
| m2 | 2022-10-31T02:00:00Z | val02 | 1.98 |
| m2 | 2022-10-31T02:00:00Z | val03 | 4.98 |
| m2 | 2022-10-31T02:00:00Z | val04 | 9.98 |
| m2 | 2022-10-31T02:00:00Z | val05 | 3.98 |
| m2 | 2022-10-31T02:00:00Z | val07 | 8.98 |
| m2 | 2022-10-31T02:00:00Z | val08 | 7.98 |
| m2 | 2022-10-31T02:00:00Z | val09 | 5.98 |
| m2 | 2022-10-31T02:00:00Z | val10 | 6.98 |
+------------------+----------------------+-------+------+
-- InfluxQL: SELECT usage_idle, bytes_free FROM cpu, disk WHERE cpu = 'cpu-total' OR device = 'disk1s1' LIMIT 2;
+------------------+----------------------+------------+------------+
| iox::measurement | time | usage_idle | bytes_free |

View File

@ -1246,6 +1246,16 @@ pub static SETUPS: Lazy<HashMap<SetupName, SetupSteps>> = Lazy::new(|| {
disk,host=host1,device=disk1s2 bytes_free=2239i,bytes_used=319833i 1667181610000000000
disk,host=host1,device=disk1s5 bytes_free=3234i,bytes_used=419838i 1667181600000000000
disk,host=host1,device=disk1s5 bytes_free=3239i,bytes_used=419833i 1667181610000000000
m2,tag0=val00 f64=0.98 1667181600000000000
m2,tag0=val02 f64=1.98 1667181600000000000
m2,tag0=val01 f64=2.98 1667181600000000000
m2,tag0=val05 f64=3.98 1667181600000000000
m2,tag0=val03 f64=4.98 1667181600000000000
m2,tag0=val09 f64=5.98 1667181600000000000
m2,tag0=val10 f64=6.98 1667181600000000000
m2,tag0=val08 f64=7.98 1667181600000000000
m2,tag0=val07 f64=8.98 1667181600000000000
m2,tag0=val04 f64=9.98 1667181600000000000
"#
.to_string(),
),

View File

@ -29,6 +29,7 @@ use datafusion::logical_expr::{
};
use datafusion_util::{lit_dict, AsExpr};
use generated_types::influxdata::iox::querier::v1::InfluxQlMetadata;
use influxdb_influxql_parser::common::{LimitClause, OffsetClause};
use influxdb_influxql_parser::explain::{ExplainOption, ExplainStatement};
use influxdb_influxql_parser::expression::walk::walk_expr;
use influxdb_influxql_parser::expression::{
@ -39,7 +40,7 @@ use influxdb_influxql_parser::select::{
FillClause, GroupByClause, SLimitClause, SOffsetClause, TimeZoneClause,
};
use influxdb_influxql_parser::{
common::{LimitClause, MeasurementName, OffsetClause, WhereClause},
common::{MeasurementName, WhereClause},
expression::Expr as IQLExpr,
identifier::Identifier,
literal::Literal,
@ -57,6 +58,7 @@ use schema::{
};
use std::collections::{HashSet, VecDeque};
use std::fmt::Debug;
use std::iter;
use std::ops::{Bound, ControlFlow, Deref, Range};
use std::str::FromStr;
use std::sync::Arc;
@ -335,7 +337,6 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
select,
&fields,
&group_by_tag_set,
&projection_tag_set,
)? {
// Exclude any plans that produce no data, which is
// consistent with InfluxQL.
@ -352,15 +353,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
// UNION the remaining plans
let plan = plans.into_iter().try_fold(plan, |prev, (next, proj)| {
let next = self.project_select(
&ctx,
next,
proj,
select,
&fields,
&group_by_tag_set,
&projection_tag_set,
)?;
let next = self.project_select(&ctx, next, proj, select, &fields, &group_by_tag_set)?;
if let LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
..
@ -386,20 +379,33 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
},
)?;
let plan = if let LogicalPlan::Union(_) = plan {
// If the result set is a Union and therefore produces multiple measurements, they must
// be sorted by measurement in ascending order.
plan_with_sort(plan, select, &group_by_tag_set, &projection_tag_set)?
} else {
plan
};
// true if the input plan is the UNION, indicating
// the result set produces multiple tables or measurements.
let is_multiple_measurements = matches!(plan, LogicalPlan::Union(_));
let plan = plan_with_sort(
plan,
select.order_by.to_sort_expr(),
is_multiple_measurements,
&group_by_tag_set,
&projection_tag_set,
)?;
let plan = self.limit(
plan,
select.offset,
select.limit,
select.order_by.to_sort_expr(),
is_multiple_measurements,
&group_by_tag_set,
&projection_tag_set,
)?;
let plan = self.slimit(plan, select.series_offset, select.series_limit)?;
Ok(plan)
}
#[allow(clippy::too_many_arguments)]
fn project_select(
&self,
ctx: &Context<'_>,
@ -408,7 +414,6 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
select: &SelectStatement,
fields: &[Field],
group_by_tag_set: &[&str],
projection_tag_set: &[&str],
) -> Result<LogicalPlan> {
let schemas = Schemas::new(input.schema())?;
@ -440,110 +445,11 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
let (plan, select_exprs_post_aggr) =
self.select_aggregate(ctx, plan, fields, select_exprs, group_by_tag_set, &schemas)?;
// projection expressions includes the `iox::measurement` column
let proj_exprs = proj
.into_iter()
.chain(select_exprs_post_aggr.into_iter())
.collect::<Vec<_>>();
// Wrap the plan in a `LogicalPlan::Projection` from the select expressions
let plan = project(plan, proj_exprs.clone())?;
let plan = plan_with_sort(plan, select, group_by_tag_set, projection_tag_set)?;
if select.offset.is_some() || select.limit.is_some() {
if group_by_tag_set.is_empty() {
// If the query is not grouping by tags, the
// DataFusion Limit operator is sufficient.
self.limit(plan, select.offset, select.limit)
} else {
// If the query includes a GROUP BY tag[, tag, ...], the LIMIT and OFFSET clauses
// are applied to each unique group. To accomplish this, construct a plan which uses
// the ROW_NUMBER windowing function.
// The name of the ROW_NUMBER window expression
const IOX_ROW_ALIAS: &str = "iox::row";
// Construct a ROW_NUMBER window expression:
//
// ROW_NUMBER() OVER (
// PARTITION BY [group_by_tag_set]
// ORDER BY time [ASC | DESC]
// ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
// ) AS iox::row
let window_func_exprs = vec![Expr::WindowFunction(WindowFunction {
fun: window_function::WindowFunction::BuiltInWindowFunction(
BuiltInWindowFunction::RowNumber,
),
args: vec![],
partition_by: fields_to_exprs_no_nulls(plan.schema(), group_by_tag_set)
.collect::<Vec<_>>(),
order_by: vec![select.order_by.to_sort_expr()],
window_frame: WindowFrame {
units: WindowFrameUnits::Rows,
start_bound: WindowFrameBound::Preceding(ScalarValue::Null),
end_bound: WindowFrameBound::CurrentRow,
},
})
.alias(IOX_ROW_ALIAS)];
let plan = LogicalPlanBuilder::from(plan)
.window(window_func_exprs)?
.build()?;
// Replace any expressions that are not a column with a column referencing
// an output column from the aggregate schema.
let proj_exprs = proj_exprs
.iter()
.map(|expr| expr_as_column_expr(expr, &plan))
.collect::<Result<Vec<Expr>>>()?;
let limit = select
.limit
.map(|v| <u64 as TryInto<i64>>::try_into(*v))
.transpose()
.map_err(|_| DataFusionError::Plan("limit out of range".to_owned()))?;
let offset = select
.offset
.map(|v| <u64 as TryInto<i64>>::try_into(*v))
.transpose()
.map_err(|_| DataFusionError::Plan("offset out of range".to_owned()))?;
// a reference to the ROW_NUMBER column.
let row_alias = IOX_ROW_ALIAS.as_expr();
let row_filter_expr = match (limit, offset) {
// WHERE "iox::row" BETWEEN OFFSET + 1 AND OFFSET + LIMIT
(Some(limit), Some(offset)) => {
let low = offset + 1;
let high = offset + limit;
Expr::Between(Between {
expr: Box::new(row_alias),
negated: false,
low: Box::new(lit(low)),
high: Box::new(lit(high)),
})
}
// WHERE "iox::row" <= LIMIT
(Some(limit), None) => row_alias.lt_eq(lit(limit)),
// WHERE "iox::row" > OFFSET
(None, Some(offset)) => row_alias.gt(lit(offset)),
(None, None) => unreachable!("limit and offset cannot not be None"),
};
LogicalPlanBuilder::from(plan)
// Filter by the LIMIT and OFFSET clause
.filter(row_filter_expr)?
// Project the output without the IOX_ROW_ALIAS column
.project(proj_exprs)?
.build()
}
} else {
Ok(plan)
}
project(
plan,
proj.into_iter().chain(select_exprs_post_aggr.into_iter()),
)
}
fn select_aggregate(
@ -723,22 +629,138 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
Ok((plan, select_exprs_post_aggr))
}
/// Optionally wrap the input logical plan in a [`LogicalPlan::Limit`] node using the specified
/// `offset` and `limit`.
/// Generate a plan that partitions the input data into groups, first omitting a specified
/// number of rows, followed by restricting the quantity of rows within each group.
///
/// ## Arguments
///
/// - `input`: The plan to apply the limit to.
/// - `offset`: The number of input rows to skip.
/// - `limit`: The maximum number of rows to return in the output plan per group.
/// - `time_sort_expr`: An `Expr::Sort` referring to the `time` column of the input.
/// - `is_multiple_measurements`: `true` if the `input` produces multiple measurements,
/// and therefore the limit should be applied per measurement and any additional group tags.
/// - `group_by_tag_set`: Tag columns from the `input` plan that should be used to partition
/// the `input` plan and sort the `output` plan.
/// - `projection_tag_set`: Additional tag columns that should be used to sort the `output`
/// plan.
#[allow(clippy::too_many_arguments)]
fn limit(
&self,
input: LogicalPlan,
offset: Option<OffsetClause>,
limit: Option<LimitClause>,
time_sort_expr: Expr,
is_multiple_measurements: bool,
group_by_tag_set: &[&str],
projection_tag_set: &[&str],
) -> Result<LogicalPlan> {
if offset.is_none() && limit.is_none() {
return Ok(input);
}
if group_by_tag_set.is_empty() && !is_multiple_measurements {
// If the query is not grouping by tags, and is a single measurement, the DataFusion
// Limit operator is sufficient.
let skip = offset.map_or(0, |v| *v as usize);
let fetch = limit.map(|v| *v as usize);
LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
} else {
// If the query includes a GROUP BY tag[, tag, ...], the LIMIT and OFFSET clauses
// are applied to each unique group. To accomplish this, construct a plan which uses
// the ROW_NUMBER windowing function.
// The name of the ROW_NUMBER window expression
const IOX_ROW_ALIAS: &str = "iox::row";
// Construct a ROW_NUMBER window expression:
//
// ROW_NUMBER() OVER (
// PARTITION BY [iox::measurement, group_by_tag_set]
// ORDER BY time [ASC | DESC]
// ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
// ) AS iox::row
let window_func_exprs = vec![Expr::WindowFunction(WindowFunction {
fun: window_function::WindowFunction::BuiltInWindowFunction(
BuiltInWindowFunction::RowNumber,
),
args: vec![],
partition_by: iter::once(INFLUXQL_MEASUREMENT_COLUMN_NAME.as_expr())
.chain(fields_to_exprs_no_nulls(input.schema(), group_by_tag_set))
.collect::<Vec<_>>(),
order_by: vec![time_sort_expr.clone()],
window_frame: WindowFrame {
units: WindowFrameUnits::Rows,
start_bound: WindowFrameBound::Preceding(ScalarValue::Null),
end_bound: WindowFrameBound::CurrentRow,
},
})
.alias(IOX_ROW_ALIAS)];
// Prepare new projection.
let proj_exprs = input
.schema()
.fields()
.iter()
.map(|expr| Expr::Column(expr.unqualified_column()))
.collect::<Vec<_>>();
let plan = LogicalPlanBuilder::from(input)
.window(window_func_exprs)?
.build()?;
let limit = limit
.map(|v| <u64 as TryInto<i64>>::try_into(*v))
.transpose()
.map_err(|_| DataFusionError::Plan("limit out of range".to_owned()))?;
let offset = offset
.map(|v| <u64 as TryInto<i64>>::try_into(*v))
.transpose()
.map_err(|_| DataFusionError::Plan("offset out of range".to_owned()))?;
// a reference to the ROW_NUMBER column.
let row_alias = IOX_ROW_ALIAS.as_expr();
let row_filter_expr = match (limit, offset) {
// WHERE "iox::row" BETWEEN OFFSET + 1 AND OFFSET + LIMIT
(Some(limit), Some(offset)) => {
let low = offset + 1;
let high = offset + limit;
Expr::Between(Between {
expr: Box::new(row_alias),
negated: false,
low: Box::new(lit(low)),
high: Box::new(lit(high)),
})
}
// WHERE "iox::row" <= LIMIT
(Some(limit), None) => row_alias.lt_eq(lit(limit)),
// WHERE "iox::row" > OFFSET
(None, Some(offset)) => row_alias.gt(lit(offset)),
(None, None) => unreachable!("limit and offset cannot not be None"),
};
let plan = LogicalPlanBuilder::from(plan)
// Filter by the LIMIT and OFFSET clause
.filter(row_filter_expr)?
// Project the output without the IOX_ROW_ALIAS column
.project(proj_exprs)?
.build()?;
// For consistency with InfluxQL, the final results must be sorted by
// the tag set from the GROUP BY
plan_with_sort(
plan,
time_sort_expr,
is_multiple_measurements,
group_by_tag_set,
projection_tag_set,
)
}
}
/// Verifies the `SLIMIT` and `SOFFSET` clauses are `None`; otherwise, return a
@ -1778,13 +1800,9 @@ mod test {
assert_snapshot!(plan("SELECT host, cpu, device, usage_idle, bytes_used FROM cpu, disk"), @r###"
Sort: iox::measurement ASC NULLS LAST, time ASC NULLS LAST, cpu ASC NULLS LAST, device ASC NULLS LAST, host ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_used:Int64;N]
Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_used:Int64;N]
Projection: iox::measurement, time, host, CAST(cpu AS Utf8) AS cpu, CAST(device AS Utf8) AS device, usage_idle, CAST(bytes_used AS Int64) AS bytes_used [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_used:Int64;N]
Sort: time ASC NULLS LAST, cpu ASC NULLS LAST, host ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Dictionary(Int32, Utf8);N, device:Null;N, usage_idle:Float64;N, bytes_used:Null;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.host AS host, cpu.cpu AS cpu, NULL AS device, cpu.usage_idle AS usage_idle, NULL AS bytes_used [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Dictionary(Int32, Utf8);N, device:Null;N, usage_idle:Float64;N, bytes_used:Null;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.host AS host, CAST(cpu.cpu AS Utf8) AS cpu, CAST(NULL AS Utf8) AS device, cpu.usage_idle AS usage_idle, CAST(NULL AS Int64) AS bytes_used [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_used:Int64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
Projection: iox::measurement, time, host, CAST(cpu AS Utf8) AS cpu, CAST(device AS Utf8) AS device, CAST(usage_idle AS Float64) AS usage_idle, bytes_used [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_used:Int64;N]
Sort: time ASC NULLS LAST, device ASC NULLS LAST, host ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Null;N, device:Dictionary(Int32, Utf8);N, usage_idle:Null;N, bytes_used:Int64;N]
Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, disk.host AS host, NULL AS cpu, disk.device AS device, NULL AS usage_idle, disk.bytes_used AS bytes_used [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Null;N, device:Dictionary(Int32, Utf8);N, usage_idle:Null;N, bytes_used:Int64;N]
Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, disk.host AS host, CAST(NULL AS Utf8) AS cpu, CAST(disk.device AS Utf8) AS device, CAST(NULL AS Float64) AS usage_idle, disk.bytes_used AS bytes_used [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_used:Int64;N]
TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)]
"###);
@ -1800,10 +1818,8 @@ mod test {
assert_snapshot!(plan("SELECT host, usage_idle FROM cpu, cpu"), @r###"
Sort: iox::measurement ASC NULLS LAST, time ASC NULLS LAST, host ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, usage_idle:Float64;N]
Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, usage_idle:Float64;N]
Sort: time ASC NULLS LAST, host ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, usage_idle:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.host AS host, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, usage_idle:Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
Sort: time ASC NULLS LAST, host ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, usage_idle:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.host AS host, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), host:Dictionary(Int32, Utf8);N, usage_idle:Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
@ -2312,27 +2328,30 @@ mod test {
#[test]
fn with_limit_or_offset() {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo LIMIT 1"), @r###"
Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N]
Projection: iox::measurement, time, foo, count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N]
Filter: iox::row <= Int64(1) [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [foo] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [iox::measurement, foo] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N]
Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, data.foo AS foo, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N]
Aggregate: groupBy=[[data.foo]], aggr=[[COUNT(data.f64_field)]] [foo:Dictionary(Int32, Utf8);N, COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo OFFSET 1"), @r###"
Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N]
Projection: iox::measurement, time, foo, count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N]
Filter: iox::row > Int64(1) [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [foo] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [iox::measurement, foo] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N]
Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, data.foo AS foo, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N]
Aggregate: groupBy=[[data.foo]], aggr=[[COUNT(data.f64_field)]] [foo:Dictionary(Int32, Utf8);N, COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo LIMIT 2 OFFSET 3"), @r###"
Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N]
Projection: iox::measurement, time, foo, count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N]
Filter: iox::row BETWEEN Int64(4) AND Int64(5) [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [foo] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [iox::measurement, foo] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N, iox::row:UInt64;N]
Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, data.foo AS foo, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N]
Aggregate: groupBy=[[data.foo]], aggr=[[COUNT(data.f64_field)]] [foo:Dictionary(Int32, Utf8);N, COUNT(data.f64_field):Int64;N]
@ -2508,13 +2527,9 @@ mod test {
assert_snapshot!(plan("SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY cpu"), @r###"
Sort: iox::measurement ASC NULLS LAST, cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Projection: iox::measurement, time, CAST(cpu AS Utf8) AS cpu, usage_idle, CAST(bytes_free AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle, NULL AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, CAST(cpu.cpu AS Utf8) AS cpu, cpu.usage_idle AS usage_idle, CAST(NULL AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
Projection: iox::measurement, time, CAST(cpu AS Utf8) AS cpu, CAST(usage_idle AS Float64) AS usage_idle, bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N]
Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, NULL AS cpu, NULL AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N]
Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, CAST(NULL AS Utf8) AS cpu, CAST(NULL AS Float64) AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)]
"###);
@ -2522,13 +2537,9 @@ mod test {
assert_snapshot!(plan("SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY device, cpu"), @r###"
Sort: iox::measurement ASC NULLS LAST, cpu ASC NULLS LAST, device ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Projection: iox::measurement, time, CAST(cpu AS Utf8) AS cpu, CAST(device AS Utf8) AS device, usage_idle, CAST(bytes_free AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, device:Null;N, usage_idle:Float64;N, bytes_free:Null;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, NULL AS device, cpu.usage_idle AS usage_idle, NULL AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, device:Null;N, usage_idle:Float64;N, bytes_free:Null;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, CAST(cpu.cpu AS Utf8) AS cpu, CAST(NULL AS Utf8) AS device, cpu.usage_idle AS usage_idle, CAST(NULL AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
Projection: iox::measurement, time, CAST(cpu AS Utf8) AS cpu, CAST(device AS Utf8) AS device, CAST(usage_idle AS Float64) AS usage_idle, bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Sort: device ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Null;N, device:Dictionary(Int32, Utf8);N, usage_idle:Null;N, bytes_free:Int64;N]
Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, NULL AS cpu, disk.device AS device, NULL AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Null;N, device:Dictionary(Int32, Utf8);N, usage_idle:Null;N, bytes_free:Int64;N]
Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, CAST(NULL AS Utf8) AS cpu, CAST(disk.device AS Utf8) AS device, CAST(NULL AS Float64) AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)]
"###);
@ -2536,13 +2547,9 @@ mod test {
assert_snapshot!(plan("SELECT device, usage_idle, bytes_free FROM cpu, disk GROUP BY cpu"), @r###"
Sort: iox::measurement ASC NULLS LAST, cpu ASC NULLS LAST, time ASC NULLS LAST, device ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Projection: iox::measurement, time, CAST(cpu AS Utf8) AS cpu, CAST(device AS Utf8) AS device, usage_idle, CAST(bytes_free AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, device:Null;N, usage_idle:Float64;N, bytes_free:Null;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, NULL AS device, cpu.usage_idle AS usage_idle, NULL AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, device:Null;N, usage_idle:Float64;N, bytes_free:Null;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, CAST(cpu.cpu AS Utf8) AS cpu, CAST(NULL AS Utf8) AS device, cpu.usage_idle AS usage_idle, CAST(NULL AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
Projection: iox::measurement, time, CAST(cpu AS Utf8) AS cpu, CAST(device AS Utf8) AS device, CAST(usage_idle AS Float64) AS usage_idle, bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Sort: time ASC NULLS LAST, device ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Null;N, device:Dictionary(Int32, Utf8);N, usage_idle:Null;N, bytes_free:Int64;N]
Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, NULL AS cpu, disk.device AS device, NULL AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Null;N, device:Dictionary(Int32, Utf8);N, usage_idle:Null;N, bytes_free:Int64;N]
Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, CAST(NULL AS Utf8) AS cpu, CAST(disk.device AS Utf8) AS device, CAST(NULL AS Float64) AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Utf8;N, device:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)]
"###);
@ -2550,13 +2557,9 @@ mod test {
assert_snapshot!(plan("SELECT cpu, usage_idle, bytes_free FROM cpu, disk GROUP BY cpu, device"), @r###"
Sort: iox::measurement ASC NULLS LAST, cpu ASC NULLS LAST, device ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Projection: iox::measurement, time, CAST(device AS Utf8) AS device, CAST(cpu AS Utf8) AS cpu, usage_idle, CAST(bytes_free AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Null;N, cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, NULL AS device, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle, NULL AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Null;N, cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, CAST(NULL AS Utf8) AS device, CAST(cpu.cpu AS Utf8) AS cpu, cpu.usage_idle AS usage_idle, CAST(NULL AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
Projection: iox::measurement, time, CAST(device AS Utf8) AS device, CAST(cpu AS Utf8) AS cpu, CAST(usage_idle AS Float64) AS usage_idle, bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Sort: device ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Dictionary(Int32, Utf8);N, cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N]
Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, disk.device AS device, NULL AS cpu, NULL AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Dictionary(Int32, Utf8);N, cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N]
Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, CAST(disk.device AS Utf8) AS device, CAST(NULL AS Utf8) AS cpu, CAST(NULL AS Float64) AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)]
"###);
@ -2564,13 +2567,9 @@ mod test {
assert_snapshot!(plan("SELECT cpu, usage_idle, bytes_free FROM cpu, disk GROUP BY device"), @r###"
Sort: iox::measurement ASC NULLS LAST, device ASC NULLS LAST, time ASC NULLS LAST, cpu ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Projection: iox::measurement, time, CAST(device AS Utf8) AS device, CAST(cpu AS Utf8) AS cpu, usage_idle, CAST(bytes_free AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Sort: time ASC NULLS LAST, cpu ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Null;N, cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, NULL AS device, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle, NULL AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Null;N, cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, CAST(NULL AS Utf8) AS device, CAST(cpu.cpu AS Utf8) AS cpu, cpu.usage_idle AS usage_idle, CAST(NULL AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
Projection: iox::measurement, time, CAST(device AS Utf8) AS device, CAST(cpu AS Utf8) AS cpu, CAST(usage_idle AS Float64) AS usage_idle, bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Sort: device ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Dictionary(Int32, Utf8);N, cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N]
Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, disk.device AS device, NULL AS cpu, NULL AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Dictionary(Int32, Utf8);N, cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N]
Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, CAST(disk.device AS Utf8) AS device, CAST(NULL AS Utf8) AS cpu, CAST(NULL AS Float64) AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)]
"###);
@ -2578,13 +2577,9 @@ mod test {
assert_snapshot!(plan("SELECT cpu, usage_idle, bytes_free FROM cpu, disk GROUP BY device, non_existent"), @r###"
Sort: iox::measurement ASC NULLS LAST, device ASC NULLS LAST, time ASC NULLS LAST, cpu ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, non_existent:Null;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, non_existent:Null;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Projection: iox::measurement, time, CAST(device AS Utf8) AS device, non_existent, CAST(cpu AS Utf8) AS cpu, usage_idle, CAST(bytes_free AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, non_existent:Null;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Sort: time ASC NULLS LAST, cpu ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Null;N, non_existent:Null;N, cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, NULL AS device, NULL AS non_existent, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle, NULL AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Null;N, non_existent:Null;N, cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, CAST(NULL AS Utf8) AS device, NULL AS non_existent, CAST(cpu.cpu AS Utf8) AS cpu, cpu.usage_idle AS usage_idle, CAST(NULL AS Int64) AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, non_existent:Null;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
Projection: iox::measurement, time, CAST(device AS Utf8) AS device, non_existent, CAST(cpu AS Utf8) AS cpu, CAST(usage_idle AS Float64) AS usage_idle, bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, non_existent:Null;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
Sort: device ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Dictionary(Int32, Utf8);N, non_existent:Null;N, cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N]
Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, disk.device AS device, NULL AS non_existent, NULL AS cpu, NULL AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Dictionary(Int32, Utf8);N, non_existent:Null;N, cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N]
Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, CAST(disk.device AS Utf8) AS device, NULL AS non_existent, CAST(NULL AS Utf8) AS cpu, CAST(NULL AS Float64) AS usage_idle, disk.bytes_free AS bytes_free [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, non_existent:Null;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N]
TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)]
"###);
@ -2592,13 +2587,9 @@ mod test {
assert_snapshot!(plan("SELECT cpu, usage_idle, bytes_free, non_existent FROM cpu, disk GROUP BY device"), @r###"
Sort: iox::measurement ASC NULLS LAST, device ASC NULLS LAST, time ASC NULLS LAST, cpu ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N, non_existent:Null;N]
Union [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N, non_existent:Null;N]
Projection: iox::measurement, time, CAST(device AS Utf8) AS device, CAST(cpu AS Utf8) AS cpu, usage_idle, CAST(bytes_free AS Int64) AS bytes_free, non_existent [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N, non_existent:Null;N]
Sort: time ASC NULLS LAST, cpu ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Null;N, cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N, non_existent:Null;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, NULL AS device, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle, NULL AS bytes_free, NULL AS non_existent [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Null;N, cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, bytes_free:Null;N, non_existent:Null;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, CAST(NULL AS Utf8) AS device, CAST(cpu.cpu AS Utf8) AS cpu, cpu.usage_idle AS usage_idle, CAST(NULL AS Int64) AS bytes_free, NULL AS non_existent [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N, non_existent:Null;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
Projection: iox::measurement, time, CAST(device AS Utf8) AS device, CAST(cpu AS Utf8) AS cpu, CAST(usage_idle AS Float64) AS usage_idle, bytes_free, non_existent [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N, non_existent:Null;N]
Sort: device ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Dictionary(Int32, Utf8);N, cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N, non_existent:Null;N]
Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, disk.device AS device, NULL AS cpu, NULL AS usage_idle, disk.bytes_free AS bytes_free, NULL AS non_existent [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Dictionary(Int32, Utf8);N, cpu:Null;N, usage_idle:Null;N, bytes_free:Int64;N, non_existent:Null;N]
Projection: Dictionary(Int32, Utf8("disk")) AS iox::measurement, disk.time AS time, CAST(disk.device AS Utf8) AS device, CAST(NULL AS Utf8) AS cpu, CAST(NULL AS Float64) AS usage_idle, disk.bytes_free AS bytes_free, NULL AS non_existent [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), device:Utf8;N, cpu:Utf8;N, usage_idle:Float64;N, bytes_free:Int64;N, non_existent:Null;N]
TableScan: disk [bytes_free:Int64;N, bytes_used:Int64;N, device:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)]
"###);
}
@ -2606,25 +2597,28 @@ mod test {
#[test]
fn test_select_group_by_limit_offset() {
assert_snapshot!(plan("SELECT usage_idle FROM cpu GROUP BY cpu LIMIT 1"), @r###"
Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N]
Projection: iox::measurement, time, cpu, usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N]
Filter: iox::row <= Int64(1) [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [cpu] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [iox::measurement, cpu] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N]
Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
assert_snapshot!(plan("SELECT usage_idle FROM cpu GROUP BY cpu OFFSET 1"), @r###"
Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N]
Projection: iox::measurement, time, cpu, usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N]
Filter: iox::row > Int64(1) [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [cpu] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [iox::measurement, cpu] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N]
Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
assert_snapshot!(plan("SELECT usage_idle FROM cpu GROUP BY cpu LIMIT 1 OFFSET 1"), @r###"
Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N]
Projection: iox::measurement, time, cpu, usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N]
Filter: iox::row BETWEEN Int64(2) AND Int64(2) [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [cpu] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [iox::measurement, cpu] ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N, iox::row:UInt64;N]
Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, cpu.time AS time, cpu.cpu AS cpu, cpu.usage_idle AS usage_idle [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cpu:Dictionary(Int32, Utf8);N, usage_idle:Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]

View File

@ -6,7 +6,7 @@ use datafusion_util::AsExpr;
use generated_types::influxdata::iox::querier::v1::influx_ql_metadata::TagKeyColumn;
use influxdb_influxql_parser::common::OrderByClause;
use influxdb_influxql_parser::expression::{Expr as IQLExpr, VarRef, VarRefDataType};
use influxdb_influxql_parser::select::{Field, SelectStatement};
use influxdb_influxql_parser::select::Field;
use schema::INFLUXQL_MEASUREMENT_COLUMN_NAME;
use std::collections::HashMap;
use std::ops::Deref;
@ -74,7 +74,8 @@ pub(super) fn make_tag_key_column_meta(
/// Sort expressions referring to tag keys are always specified in lexicographically ascending order.
pub(super) fn plan_with_sort(
plan: LogicalPlan,
select: &SelectStatement,
time_sort_expr: Expr,
sort_by_measurement: bool,
group_by_tag_set: &[&str],
projection_tag_set: &[&str],
) -> Result<LogicalPlan> {
@ -82,7 +83,7 @@ pub(super) fn plan_with_sort(
// NOTE: Ideally DataFusion would maintain the order of the UNION ALL, which would eliminate
// the need to sort by measurement.
// See: https://github.com/influxdata/influxdb_iox/issues/7062
let mut series_sort = if matches!(plan, LogicalPlan::Union(_)) {
let mut series_sort = if sort_by_measurement {
vec![Expr::sort(
INFLUXQL_MEASUREMENT_COLUMN_NAME.as_expr(),
true,
@ -116,7 +117,7 @@ pub(super) fn plan_with_sort(
series_sort.extend(map_to_expr(schema, group_by_tag_set));
};
series_sort.push(select.order_by.to_sort_expr());
series_sort.push(time_sort_expr);
series_sort.extend(map_to_expr(schema, projection_tag_set));