feat: outer GROUP BY pushed down to subqueries; more Cloud 2 examples
parent
e75a95bca7
commit
7ba619a32b
|
@ -548,7 +548,56 @@ SELECT * FROM (SELECT usage_idle FROM cpu GROUP BY cpu) GROUP BY cpu;
|
|||
-- GROUP BY should be pushed to subquery
|
||||
SELECT * FROM (SELECT usage_idle FROM cpu) GROUP BY cpu;
|
||||
|
||||
-- GROUP BY should be pushed through multiple levels of subqueries
|
||||
SELECT * FROM (SELECT MAX(value) FROM (SELECT DISTINCT(usage_idle) AS value FROM cpu)) GROUP BY cpu;
|
||||
|
||||
-- GROUP BY time with default FILL(none) for subquery
|
||||
SELECT value FROM (SELECT mean(usage_idle) AS value FROM cpu GROUP BY TIME(10s));
|
||||
SELECT value FROM (SELECT mean(usage_idle) AS value FROM cpu GROUP BY TIME(10s));
|
||||
|
||||
-- Multiple subqueries from same measurement, rows with same timestamp are not combined
|
||||
SELECT sum_idle, sum_system FROM (SELECT sum(usage_idle) AS sum_idle FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)), (SELECT sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s));
|
||||
|
||||
-- Similar query as prior, rows with same timestamp are combined (as expected)
|
||||
SELECT sum_idle, sum_system FROM (SELECT sum(usage_idle) AS sum_idle, sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s));
|
||||
|
||||
-- TODO(sgc): Incorrect output
|
||||
-- Multiple subqueries from same measurement
|
||||
-- * Outer query projects aggregates, values at same timestamp should be combined
|
||||
-- Source: Cloud 2
|
||||
-- Expected output:
|
||||
-- name: cpu
|
||||
-- +---------------------+------+--------+
|
||||
-- | time | last | last_1 |
|
||||
-- +---------------------+------+--------+
|
||||
-- | 1970-01-01T00:00:00 | 2.99 | 2.1 |
|
||||
-- +---------------------+------+--------+
|
||||
-- SELECT last(sum_idle), last(sum_system) FROM (SELECT sum(usage_idle) AS sum_idle FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)), (SELECT sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s));
|
||||
|
||||
-- Similar query produces expected output of prior query
|
||||
SELECT last(sum_idle), last(sum_system) FROM (SELECT sum(usage_idle) AS sum_idle, sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s));
|
||||
|
||||
|
||||
-- TODO(sgc): Incorrect output
|
||||
-- Multiple subqueries from same measurement, GROUP BY TIME
|
||||
-- * Outer query projects aggregates, values at same timestamp should be combined
|
||||
-- Source: Cloud 2
|
||||
-- Expected output:
|
||||
-- name: cpu
|
||||
-- +---------------------+------+--------+
|
||||
-- | time | last | last_1 |
|
||||
-- +---------------------+------+--------+
|
||||
-- | 2022-10-31T02:00:00 | 2.98 | 2.2 |
|
||||
-- | 2022-10-31T02:00:10 | 2.99 | 2.1 |
|
||||
-- +---------------------+------+--------+
|
||||
-- SELECT last(sum_idle), last(sum_system) FROM (SELECT sum(usage_idle) AS sum_idle FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)), (SELECT sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)) GROUP BY time(10s) FILL(none);
|
||||
|
||||
-- Similar query produces expected output of prior query
|
||||
SELECT last(sum_idle), last(sum_system) FROM (SELECT sum(usage_idle) AS sum_idle, sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)) GROUP BY time(10s) FILL(none);
|
||||
|
||||
-- Conditions on fields projected of subquery
|
||||
-- Source: Cloud 2
|
||||
SELECT P / 99.99 FROM (SELECT max(usage_system) AS P, mean(usage_idle) AS S FROM cpu GROUP BY time(10s)) WHERE S > 0.50 AND S < 1.50 AND P > 0.0 AND P < 1.0 GROUP BY cpu FILL(null);
|
||||
|
||||
-- DISTINCT tag value from subquery
|
||||
-- Source: Cloud 2
|
||||
SELECT distinct(cpu) FROM (SELECT usage_idle, cpu FROM cpu);
|
|
@ -2850,4 +2850,55 @@ name: cpu
|
|||
+---------------------+--------------------+
|
||||
| 2022-10-31T02:00:00 | 1.9799999999999998 |
|
||||
| 2022-10-31T02:00:10 | 1.9900000000000002 |
|
||||
+---------------------+--------------------+
|
||||
+---------------------+--------------------+
|
||||
-- InfluxQL: SELECT sum_idle, sum_system FROM (SELECT sum(usage_idle) AS sum_idle FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)), (SELECT sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s));
|
||||
name: cpu
|
||||
+---------------------+----------+------------+
|
||||
| time | sum_idle | sum_system |
|
||||
+---------------------+----------+------------+
|
||||
| 2022-10-31T02:00:00 | 2.98 | |
|
||||
| 2022-10-31T02:00:00 | | 2.2 |
|
||||
| 2022-10-31T02:00:10 | 2.99 | |
|
||||
| 2022-10-31T02:00:10 | | 2.1 |
|
||||
+---------------------+----------+------------+
|
||||
-- InfluxQL: SELECT sum_idle, sum_system FROM (SELECT sum(usage_idle) AS sum_idle, sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s));
|
||||
name: cpu
|
||||
+---------------------+----------+------------+
|
||||
| time | sum_idle | sum_system |
|
||||
+---------------------+----------+------------+
|
||||
| 2022-10-31T02:00:00 | 2.98 | 2.2 |
|
||||
| 2022-10-31T02:00:10 | 2.99 | 2.1 |
|
||||
+---------------------+----------+------------+
|
||||
-- InfluxQL: SELECT last(sum_idle), last(sum_system) FROM (SELECT sum(usage_idle) AS sum_idle, sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s));
|
||||
name: cpu
|
||||
+---------------------+------+--------+
|
||||
| time | last | last_1 |
|
||||
+---------------------+------+--------+
|
||||
| 1970-01-01T00:00:00 | 2.99 | 2.1 |
|
||||
+---------------------+------+--------+
|
||||
-- InfluxQL: SELECT last(sum_idle), last(sum_system) FROM (SELECT sum(usage_idle) AS sum_idle, sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)) GROUP BY time(10s) FILL(none);
|
||||
name: cpu
|
||||
+---------------------+------+--------+
|
||||
| time | last | last_1 |
|
||||
+---------------------+------+--------+
|
||||
| 2022-10-31T02:00:00 | 2.98 | 2.2 |
|
||||
| 2022-10-31T02:00:10 | 2.99 | 2.1 |
|
||||
+---------------------+------+--------+
|
||||
-- InfluxQL: SELECT P / 99.99 FROM (SELECT max(usage_system) AS P, mean(usage_idle) AS S FROM cpu GROUP BY time(10s)) WHERE S > 0.50 AND S < 1.50 AND P > 0.0 AND P < 1.0 GROUP BY cpu FILL(null);
|
||||
name: cpu
|
||||
tags: cpu=cpu0
|
||||
+---------------------+-------------------+
|
||||
| time | P |
|
||||
+---------------------+-------------------+
|
||||
| 2022-10-31T02:00:00 | 0.002000200020002 |
|
||||
| 2022-10-31T02:00:10 | 0.001000100010001 |
|
||||
+---------------------+-------------------+
|
||||
-- InfluxQL: SELECT distinct(cpu) FROM (SELECT usage_idle, cpu FROM cpu);
|
||||
name: cpu
|
||||
+---------------------+-----------+
|
||||
| time | distinct |
|
||||
+---------------------+-----------+
|
||||
| 1970-01-01T00:00:00 | cpu-total |
|
||||
| 1970-01-01T00:00:00 | cpu0 |
|
||||
| 1970-01-01T00:00:00 | cpu1 |
|
||||
+---------------------+-----------+
|
|
@ -31,8 +31,11 @@ pub(super) struct SelectQuery {
|
|||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub(super) struct Select {
|
||||
/// The schema of the selection.
|
||||
// pub(super) schema: Todo,
|
||||
/// The depth of the selection, where a value > 0 indicates
|
||||
/// this is a subquery.
|
||||
pub(super) depth: u32,
|
||||
|
||||
/// The projection type of the selection.
|
||||
pub(super) projection_type: ProjectionType,
|
||||
|
||||
/// Projection clause of the selection.
|
||||
|
|
|
@ -325,153 +325,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
.with_has_multiple_measurements(query.has_multiple_measurements)
|
||||
.with_root_group_by_tags(&group_by_tags);
|
||||
|
||||
// Skip the `time` column
|
||||
let fields_no_time = &select.fields[1..];
|
||||
// always start with the time column
|
||||
let mut fields = vec![select.fields.first().cloned().unwrap()];
|
||||
|
||||
// group_by_tag_set : a list of tag columns specified in the GROUP BY clause
|
||||
// projection_tag_set : a list of tag columns specified exclusively in the SELECT projection
|
||||
// is_projected : a list of booleans indicating whether matching elements in the
|
||||
// group_by_tag_set are also projected in the query
|
||||
let (group_by_tag_set, projection_tag_set, is_projected) =
|
||||
if let Some(group_by) = &select.group_by {
|
||||
let mut tag_columns =
|
||||
find_tag_and_unknown_columns(fields_no_time).collect::<HashSet<_>>();
|
||||
|
||||
// Find the list of tag keys specified in the `GROUP BY` clause, and
|
||||
// whether any of the tag keys are also projected in the SELECT list.
|
||||
let (tag_set, is_projected): (Vec<_>, Vec<_>) = group_by
|
||||
.tags()
|
||||
.map(|t| t.deref().as_str())
|
||||
.map(|s| (s, tag_columns.contains(s)))
|
||||
// We sort the tag set, to ensure correct ordering of the results. The tag columns
|
||||
// referenced in the `tag_set` variable are added to the sort operator in
|
||||
// lexicographically ascending order.
|
||||
.sorted_by(|a, b| a.0.cmp(b.0))
|
||||
.unzip();
|
||||
|
||||
// Tags specified in the `GROUP BY` clause that are not already added to the
|
||||
// projection must be projected, so they can be used in the group key.
|
||||
//
|
||||
// At the end of the loop, the `tag_columns` set will contain the tag columns that
|
||||
// exist in the projection and not in the `GROUP BY`.
|
||||
fields.extend(
|
||||
tag_set
|
||||
.iter()
|
||||
.filter_map(|col| match tag_columns.remove(*col) {
|
||||
true => None,
|
||||
false => Some(Field {
|
||||
expr: IQLExpr::VarRef(VarRef {
|
||||
name: (*col).into(),
|
||||
data_type: Some(VarRefDataType::Tag),
|
||||
}),
|
||||
name: col.to_string(),
|
||||
data_type: Some(InfluxColumnType::Tag),
|
||||
}),
|
||||
}),
|
||||
);
|
||||
|
||||
(
|
||||
tag_set,
|
||||
tag_columns.into_iter().sorted().collect::<Vec<_>>(),
|
||||
is_projected,
|
||||
)
|
||||
} else {
|
||||
let tag_columns = find_tag_and_unknown_columns(fields_no_time)
|
||||
.sorted()
|
||||
.collect::<Vec<_>>();
|
||||
(vec![], tag_columns, vec![])
|
||||
};
|
||||
|
||||
fields.extend(fields_no_time.iter().cloned());
|
||||
|
||||
let plan = {
|
||||
let mut iter = select.from.iter();
|
||||
let plan = match iter.next() {
|
||||
Some(ds) => self.project_select(&ctx, ds, select, &fields, &group_by_tag_set),
|
||||
None => {
|
||||
// empty result, but let's at least have all the strictly necessary metadata
|
||||
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
|
||||
INFLUXQL_MEASUREMENT_COLUMN_NAME,
|
||||
(&InfluxColumnType::Tag).into(),
|
||||
false,
|
||||
)]));
|
||||
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
|
||||
produce_one_row: false,
|
||||
schema: schema.to_dfschema_ref()?,
|
||||
});
|
||||
let plan = plan_with_metadata(
|
||||
plan,
|
||||
&InfluxQlMetadata {
|
||||
measurement_column_index: MEASUREMENT_COLUMN_INDEX,
|
||||
tag_key_columns: vec![],
|
||||
},
|
||||
)?;
|
||||
return Ok(plan);
|
||||
}
|
||||
}?;
|
||||
|
||||
iter.try_fold(plan, |prev, ds| {
|
||||
let next = self.project_select(&ctx, ds, select, &fields, &group_by_tag_set)?;
|
||||
LogicalPlanBuilder::from(prev).union(next)?.build()
|
||||
})?
|
||||
};
|
||||
|
||||
let plan = plan_with_metadata(
|
||||
plan,
|
||||
&InfluxQlMetadata {
|
||||
measurement_column_index: MEASUREMENT_COLUMN_INDEX,
|
||||
tag_key_columns: make_tag_key_column_meta(
|
||||
&fields,
|
||||
&group_by_tag_set,
|
||||
&is_projected,
|
||||
),
|
||||
},
|
||||
)?;
|
||||
|
||||
// the sort planner node must refer to the time column using
|
||||
// the alias that was specified
|
||||
let time_alias = fields[0].name.as_str();
|
||||
|
||||
let time_sort_expr = time_alias.as_expr().sort(
|
||||
match select.order_by {
|
||||
// Default behaviour is to sort by time in ascending order if there is no ORDER BY
|
||||
None | Some(OrderByClause::Ascending) => true,
|
||||
Some(OrderByClause::Descending) => false,
|
||||
},
|
||||
false,
|
||||
);
|
||||
|
||||
let plan = plan_with_sort(
|
||||
plan,
|
||||
vec![time_sort_expr.clone()],
|
||||
ctx.has_multiple_measurements,
|
||||
&group_by_tag_set,
|
||||
&projection_tag_set,
|
||||
)?;
|
||||
|
||||
let plan = self.limit(
|
||||
plan,
|
||||
select.offset,
|
||||
select.limit,
|
||||
vec![time_sort_expr],
|
||||
ctx.has_multiple_measurements,
|
||||
&group_by_tag_set,
|
||||
&projection_tag_set,
|
||||
)?;
|
||||
|
||||
Ok(plan)
|
||||
self.select_to_plan(&ctx, select)
|
||||
}
|
||||
|
||||
fn select_to_plan(&self, ctx: &Context<'_>, select: &Select) -> Result<LogicalPlan> {
|
||||
let ctx = Context::new()
|
||||
.with_projection_type(select.projection_type)
|
||||
.with_timezone(select.timezone)
|
||||
.with_group_by_fill(select)
|
||||
.with_has_multiple_measurements(ctx.has_multiple_measurements)
|
||||
.with_root_group_by_tags(ctx.root_group_by_tags);
|
||||
|
||||
// Skip the `time` column
|
||||
let fields_no_time = &select.fields[1..];
|
||||
// always start with the time column
|
||||
|
@ -561,6 +418,23 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
})?
|
||||
};
|
||||
|
||||
let plan = if select.depth == 0 {
|
||||
// Add the metadata to the root SELECT query
|
||||
plan_with_metadata(
|
||||
plan,
|
||||
&InfluxQlMetadata {
|
||||
measurement_column_index: MEASUREMENT_COLUMN_INDEX,
|
||||
tag_key_columns: make_tag_key_column_meta(
|
||||
&fields,
|
||||
&group_by_tag_set,
|
||||
&is_projected,
|
||||
),
|
||||
},
|
||||
)?
|
||||
} else {
|
||||
plan
|
||||
};
|
||||
|
||||
// the sort planner node must refer to the time column using
|
||||
// the alias that was specified
|
||||
let time_alias = fields[0].name.as_str();
|
||||
|
@ -1369,10 +1243,19 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
vec![lit_dict(table_name).alias(INFLUXQL_MEASUREMENT_COLUMN_NAME)],
|
||||
))
|
||||
}
|
||||
DataSource::Subquery(select) => Ok((
|
||||
self.select_to_plan(ctx, select)?,
|
||||
vec![INFLUXQL_MEASUREMENT_COLUMN_NAME.as_expr()],
|
||||
)),
|
||||
DataSource::Subquery(select) => {
|
||||
let ctx = Context::new()
|
||||
.with_projection_type(select.projection_type)
|
||||
.with_timezone(select.timezone)
|
||||
.with_group_by_fill(select)
|
||||
.with_has_multiple_measurements(ctx.has_multiple_measurements)
|
||||
.with_root_group_by_tags(ctx.root_group_by_tags);
|
||||
|
||||
Ok((
|
||||
self.select_to_plan(&ctx, select)?,
|
||||
vec![INFLUXQL_MEASUREMENT_COLUMN_NAME.as_expr()],
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue