diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index b059fd0dba..736b8b9ea0 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -1070,7 +1070,7 @@ impl InfluxRpcPlanner { let AggExprs { agg_exprs, field_columns, - } = AggExprs::try_new(agg, &schema, &predicate)?; + } = AggExprs::try_new_for_read_group(agg, &schema, &predicate)?; let plan_builder = plan_builder .aggregate(group_exprs, agg_exprs) @@ -1190,10 +1190,10 @@ impl InfluxRpcPlanner { .chain(std::iter::once(window_bound)) .collect::>(); - // aggregate each field - let agg_exprs = filtered_fields_iter(&schema, &predicate) - .map(|field| make_agg_expr(agg, field.name())) - .collect::>>()?; + let AggExprs { + agg_exprs, + field_columns, + } = AggExprs::try_new_for_read_window_aggregate(agg, &schema, &predicate)?; // sort by the group by expressions as well let sort_exprs = group_exprs @@ -1215,21 +1215,12 @@ impl InfluxRpcPlanner { .map(|field| Arc::from(field.name().as_str())) .collect(); - let field_columns = filtered_fields_iter(&schema, &predicate) - .map(|field| Arc::from(field.name().as_str())) - .collect(); - - // TODO: remove the use of tag_columns and field_column names - // and instead use the schema directly) - - let ss_plan = SeriesSetPlan::new_from_shared_timestamp( + Ok(Some(SeriesSetPlan::new( Arc::from(table_name), plan, tag_columns, field_columns, - ); - - Ok(Some(ss_plan)) + ))) } /// Create a plan that scans the specified table, and applies any @@ -1461,75 +1452,144 @@ fn filtered_fields_iter<'a>( /// the rules explained on `read_group_plan` impl AggExprs { /// Create the appropriate aggregate expressions, based on the type of the - /// field - pub fn try_new(agg: Aggregate, schema: &Schema, predicate: &Predicate) -> Result { + /// field for a `read_group` plan. + pub fn try_new_for_read_group( + agg: Aggregate, + schema: &Schema, + predicate: &Predicate, + ) -> Result { match agg { Aggregate::Sum | Aggregate::Count | Aggregate::Mean => { - // agg_function(_val1) as _value1 - // ... - // agg_function(_valN) as _valueN - // agg_function(time) as time - - let agg_exprs = filtered_fields_iter(schema, predicate) - .chain(schema.time_iter()) - .map(|field| make_agg_expr(agg, field.name())) - .collect::>>()?; - - let field_columns = filtered_fields_iter(schema, predicate) - .map(|field| Arc::from(field.name().as_str())) - .collect::>() - .into(); - - Ok(Self { - agg_exprs, - field_columns, - }) + Self::agg_for_read_group(agg, schema, predicate) } Aggregate::First | Aggregate::Last | Aggregate::Min | Aggregate::Max => { - // agg_function(_val1) as _value1 - // agg_function(time) as time1 - // .. - // agg_function(_valN) as _valueN - // agg_function(time) as timeN - - // might be nice to use a more functional style here - let mut agg_exprs = Vec::new(); - let mut field_list = Vec::new(); - - for field in filtered_fields_iter(schema, predicate) { - agg_exprs.push(make_selector_expr( - agg, - SelectorOutput::Value, - field.name(), - field.data_type(), - field.name(), - )?); - - let time_column_name = format!("{}_{}", TIME_COLUMN_NAME, field.name()); - - agg_exprs.push(make_selector_expr( - agg, - SelectorOutput::Time, - field.name(), - field.data_type(), - &time_column_name, - )?); - - field_list.push(( - Arc::from(field.name().as_str()), // value name - Arc::from(time_column_name.as_str()), - )); - } - - let field_columns = field_list.into(); - Ok(Self { - agg_exprs, - field_columns, - }) + Self::selector_aggregates(agg, schema, predicate) } Aggregate::None => InternalUnexpectedNoneAggregate.fail(), } } + + /// Create the appropriate aggregate expressions, based on the type of the + /// field for a `read_window_aggregate` plan. + pub fn try_new_for_read_window_aggregate( + agg: Aggregate, + schema: &Schema, + predicate: &Predicate, + ) -> Result { + match agg { + Aggregate::Sum | Aggregate::Count | Aggregate::Mean => { + Self::agg_for_read_window_aggregate(agg, schema, predicate) + } + Aggregate::First | Aggregate::Last | Aggregate::Min | Aggregate::Max => { + Self::selector_aggregates(agg, schema, predicate) + } + Aggregate::None => InternalUnexpectedNoneAggregate.fail(), + } + } + + // Creates special aggregate "selector" expressions for the fields in the + // provided schema. Selectors ensure that the relevant aggregate functions + // are also provided to a distinct time column for each field column. + // + // Equivalent SQL would look like: + // + // agg_function(_val1) as _value1 + // agg_function(time) as time1 + // .. + // agg_function(_valN) as _valueN + // agg_function(time) as timeN + fn selector_aggregates(agg: Aggregate, schema: &Schema, predicate: &Predicate) -> Result { + // might be nice to use a more functional style here + let mut agg_exprs = Vec::new(); + let mut field_list = Vec::new(); + + for field in filtered_fields_iter(schema, predicate) { + agg_exprs.push(make_selector_expr( + agg, + SelectorOutput::Value, + field.name(), + field.data_type(), + field.name(), + )?); + + let time_column_name = format!("{}_{}", TIME_COLUMN_NAME, field.name()); + + agg_exprs.push(make_selector_expr( + agg, + SelectorOutput::Time, + field.name(), + field.data_type(), + &time_column_name, + )?); + + field_list.push(( + Arc::from(field.name().as_str()), // value name + Arc::from(time_column_name.as_str()), + )); + } + + let field_columns = field_list.into(); + Ok(Self { + agg_exprs, + field_columns, + }) + } + + // Creates aggregate expressions for use in a read_group plan, which + // includes the time column. + // + // Equivalent SQL would look like this: + // + // agg_function(_val1) as _value1 + // ... + // agg_function(_valN) as _valueN + // agg_function(time) as time + fn agg_for_read_group(agg: Aggregate, schema: &Schema, predicate: &Predicate) -> Result { + let agg_exprs = filtered_fields_iter(schema, predicate) + .chain(schema.time_iter()) + .map(|field| make_agg_expr(agg, field.name())) + .collect::>>()?; + + let field_columns = filtered_fields_iter(schema, predicate) + .map(|field| Arc::from(field.name().as_str())) + .collect::>() + .into(); + + Ok(Self { + agg_exprs, + field_columns, + }) + } + + // Creates aggregate expressions for use in a read_window_aggregate plan. No + // aggregates are created for the time column because the + // `read_window_aggregate` uses a time column calculated using window + // bounds. + // + // Equivalent SQL would look like this: + // + // agg_function(_val1) as _value1 + // ... + // agg_function(_valN) as _valueN + fn agg_for_read_window_aggregate( + agg: Aggregate, + schema: &Schema, + predicate: &Predicate, + ) -> Result { + let agg_exprs = filtered_fields_iter(schema, predicate) + .map(|field| make_agg_expr(agg, field.name())) + .collect::>>()?; + + let field_columns = filtered_fields_iter(schema, predicate) + .map(|field| Arc::from(field.name().as_str())) + .collect::>() + .into(); + + Ok(Self { + agg_exprs, + field_columns, + }) + } } /// Creates a DataFusion expression suitable for calculating an aggregate: diff --git a/query_tests/src/influxrpc/read_window_aggregate.rs b/query_tests/src/influxrpc/read_window_aggregate.rs index c4095cb5d8..39ea4c6a37 100644 --- a/query_tests/src/influxrpc/read_window_aggregate.rs +++ b/query_tests/src/influxrpc/read_window_aggregate.rs @@ -1,5 +1,5 @@ //! Tests for the Influx gRPC queries -use crate::scenarios::*; +use crate::scenarios::{util::all_scenarios_for_one_chunk, *}; use server::{db::test_helpers::write_lp, utils::make_db}; @@ -272,3 +272,106 @@ async fn test_read_window_aggregate_months() { ) .await; } + +// Test data to validate fix for: +// https://github.com/influxdata/influxdb_iox/issues/2697 +struct MeasurementForDefect2697 {} +#[async_trait] +impl DbSetup for MeasurementForDefect2697 { + async fn make(&self) -> Vec { + let partition_key = "2021-01-01T00"; + + let lp = vec![ + "mm,section=1a bar=5.0 1609459201000000011", + "mm,section=1a bar=0.28 1609459201000000031", + "mm,section=2b bar=4.0 1609459201000000009", + "mm,section=2b bar=6.0 1609459201000000015", + "mm,section=2b bar=1.2 1609459201000000022", + "mm,section=1a foo=1.0 1609459201000000001", + "mm,section=1a foo=3.0 1609459201000000005", + "mm,section=1a foo=11.24 1609459201000000024", + "mm,section=2b foo=2.0 1609459201000000002", + ]; + + all_scenarios_for_one_chunk(vec![], vec![], lp, "mm", partition_key).await + } +} + +// See https://github.com/influxdata/influxdb_iox/issues/2697 +#[tokio::test] +async fn test_grouped_series_set_plan_group_aggregate_min_defect_2697() { + let predicate = PredicateBuilder::default() + // time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z' + .timestamp_range(1609459201000000001, 1609459201000000031) + .build(); + + let agg = Aggregate::Min; + let every = WindowDuration::from_nanoseconds(10); + let offset = WindowDuration::from_nanoseconds(0); + + // Because the windowed aggregate is using a selector aggregate (one of MIN, + // MAX, FIRST, LAST) we need to run a plan that brings along the timestamps + // for the chosen aggregate in the window. + // + // The window is defined by the `time` column + let expected_results = vec![ + "+---------+--------------------------------+-----+--------------------------------+-------+--------------------------------+", + "| section | time | bar | time_bar | foo | time_foo |", + "+---------+--------------------------------+-----+--------------------------------+-------+--------------------------------+", + "| 1a | 2021-01-01T00:00:01.000000010Z | | | 1 | 2021-01-01T00:00:01.000000001Z |", + "| 1a | 2021-01-01T00:00:01.000000020Z | 5 | 2021-01-01T00:00:01.000000011Z | | |", + "| 1a | 2021-01-01T00:00:01.000000030Z | | | 11.24 | 2021-01-01T00:00:01.000000024Z |", + "| 2b | 2021-01-01T00:00:01.000000010Z | 4 | 2021-01-01T00:00:01.000000009Z | 2 | 2021-01-01T00:00:01.000000002Z |", + "| 2b | 2021-01-01T00:00:01.000000020Z | 6 | 2021-01-01T00:00:01.000000015Z | | |", + "| 2b | 2021-01-01T00:00:01.000000030Z | 1.2 | 2021-01-01T00:00:01.000000022Z | | |", + "+---------+--------------------------------+-----+--------------------------------+-------+--------------------------------+", + ]; + + run_read_window_aggregate_test_case( + MeasurementForDefect2697 {}, + predicate, + agg, + every, + offset, + expected_results, + ) + .await; +} + +// See https://github.com/influxdata/influxdb_iox/issues/2697 +#[tokio::test] +async fn test_grouped_series_set_plan_group_aggregate_sum_defect_2697() { + let predicate = PredicateBuilder::default() + // time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z' + .timestamp_range(1609459201000000001, 1609459201000000031) + .build(); + + let agg = Aggregate::Sum; + let every = WindowDuration::from_nanoseconds(10); + let offset = WindowDuration::from_nanoseconds(0); + + // The windowed aggregate is using a non-selector aggregate (SUM, COUNT, MEAD). + // For each distinct series the window defines the `time` column + let expected_results = vec![ + "+---------+--------------------------------+-----+-------+", + "| section | time | bar | foo |", + "+---------+--------------------------------+-----+-------+", + "| 1a | 2021-01-01T00:00:01.000000010Z | | 4 |", + "| 1a | 2021-01-01T00:00:01.000000020Z | 5 | |", + "| 1a | 2021-01-01T00:00:01.000000030Z | | 11.24 |", + "| 2b | 2021-01-01T00:00:01.000000010Z | 4 | 2 |", + "| 2b | 2021-01-01T00:00:01.000000020Z | 6 | |", + "| 2b | 2021-01-01T00:00:01.000000030Z | 1.2 | |", + "+---------+--------------------------------+-----+-------+", + ]; + + run_read_window_aggregate_test_case( + MeasurementForDefect2697 {}, + predicate, + agg, + every, + offset, + expected_results, + ) + .await; +}