Merge pull request #2836 from influxdata/er/fix/storage_grpc/2697

fix: use correct timestamps for selectors in read_window_aggregate
pull/24376/head
kodiakhq[bot] 2021-10-14 08:31:58 +00:00 committed by GitHub
commit 0829711fe3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 241 additions and 78 deletions

View File

@ -1070,7 +1070,7 @@ impl InfluxRpcPlanner {
let AggExprs {
agg_exprs,
field_columns,
} = AggExprs::try_new(agg, &schema, &predicate)?;
} = AggExprs::try_new_for_read_group(agg, &schema, &predicate)?;
let plan_builder = plan_builder
.aggregate(group_exprs, agg_exprs)
@ -1190,10 +1190,10 @@ impl InfluxRpcPlanner {
.chain(std::iter::once(window_bound))
.collect::<Vec<_>>();
// aggregate each field
let agg_exprs = filtered_fields_iter(&schema, &predicate)
.map(|field| make_agg_expr(agg, field.name()))
.collect::<Result<Vec<_>>>()?;
let AggExprs {
agg_exprs,
field_columns,
} = AggExprs::try_new_for_read_window_aggregate(agg, &schema, &predicate)?;
// sort by the group by expressions as well
let sort_exprs = group_exprs
@ -1215,21 +1215,12 @@ impl InfluxRpcPlanner {
.map(|field| Arc::from(field.name().as_str()))
.collect();
let field_columns = filtered_fields_iter(&schema, &predicate)
.map(|field| Arc::from(field.name().as_str()))
.collect();
// TODO: remove the use of tag_columns and field_column names
// and instead use the schema directly)
let ss_plan = SeriesSetPlan::new_from_shared_timestamp(
Ok(Some(SeriesSetPlan::new(
Arc::from(table_name),
plan,
tag_columns,
field_columns,
);
Ok(Some(ss_plan))
)))
}
/// Create a plan that scans the specified table, and applies any
@ -1461,75 +1452,144 @@ fn filtered_fields_iter<'a>(
/// the rules explained on `read_group_plan`
impl AggExprs {
/// Create the appropriate aggregate expressions, based on the type of the
/// field
pub fn try_new(agg: Aggregate, schema: &Schema, predicate: &Predicate) -> Result<Self> {
/// field for a `read_group` plan.
pub fn try_new_for_read_group(
agg: Aggregate,
schema: &Schema,
predicate: &Predicate,
) -> Result<Self> {
match agg {
Aggregate::Sum | Aggregate::Count | Aggregate::Mean => {
// agg_function(_val1) as _value1
// ...
// agg_function(_valN) as _valueN
// agg_function(time) as time
let agg_exprs = filtered_fields_iter(schema, predicate)
.chain(schema.time_iter())
.map(|field| make_agg_expr(agg, field.name()))
.collect::<Result<Vec<_>>>()?;
let field_columns = filtered_fields_iter(schema, predicate)
.map(|field| Arc::from(field.name().as_str()))
.collect::<Vec<_>>()
.into();
Ok(Self {
agg_exprs,
field_columns,
})
Self::agg_for_read_group(agg, schema, predicate)
}
Aggregate::First | Aggregate::Last | Aggregate::Min | Aggregate::Max => {
// agg_function(_val1) as _value1
// agg_function(time) as time1
// ..
// agg_function(_valN) as _valueN
// agg_function(time) as timeN
// might be nice to use a more functional style here
let mut agg_exprs = Vec::new();
let mut field_list = Vec::new();
for field in filtered_fields_iter(schema, predicate) {
agg_exprs.push(make_selector_expr(
agg,
SelectorOutput::Value,
field.name(),
field.data_type(),
field.name(),
)?);
let time_column_name = format!("{}_{}", TIME_COLUMN_NAME, field.name());
agg_exprs.push(make_selector_expr(
agg,
SelectorOutput::Time,
field.name(),
field.data_type(),
&time_column_name,
)?);
field_list.push((
Arc::from(field.name().as_str()), // value name
Arc::from(time_column_name.as_str()),
));
}
let field_columns = field_list.into();
Ok(Self {
agg_exprs,
field_columns,
})
Self::selector_aggregates(agg, schema, predicate)
}
Aggregate::None => InternalUnexpectedNoneAggregate.fail(),
}
}
/// Create the appropriate aggregate expressions, based on the type of the
/// field for a `read_window_aggregate` plan.
pub fn try_new_for_read_window_aggregate(
agg: Aggregate,
schema: &Schema,
predicate: &Predicate,
) -> Result<Self> {
match agg {
Aggregate::Sum | Aggregate::Count | Aggregate::Mean => {
Self::agg_for_read_window_aggregate(agg, schema, predicate)
}
Aggregate::First | Aggregate::Last | Aggregate::Min | Aggregate::Max => {
Self::selector_aggregates(agg, schema, predicate)
}
Aggregate::None => InternalUnexpectedNoneAggregate.fail(),
}
}
// Creates special aggregate "selector" expressions for the fields in the
// provided schema. Selectors ensure that the relevant aggregate functions
// are also provided to a distinct time column for each field column.
//
// Equivalent SQL would look like:
//
// agg_function(_val1) as _value1
// agg_function(time) as time1
// ..
// agg_function(_valN) as _valueN
// agg_function(time) as timeN
fn selector_aggregates(agg: Aggregate, schema: &Schema, predicate: &Predicate) -> Result<Self> {
// might be nice to use a more functional style here
let mut agg_exprs = Vec::new();
let mut field_list = Vec::new();
for field in filtered_fields_iter(schema, predicate) {
agg_exprs.push(make_selector_expr(
agg,
SelectorOutput::Value,
field.name(),
field.data_type(),
field.name(),
)?);
let time_column_name = format!("{}_{}", TIME_COLUMN_NAME, field.name());
agg_exprs.push(make_selector_expr(
agg,
SelectorOutput::Time,
field.name(),
field.data_type(),
&time_column_name,
)?);
field_list.push((
Arc::from(field.name().as_str()), // value name
Arc::from(time_column_name.as_str()),
));
}
let field_columns = field_list.into();
Ok(Self {
agg_exprs,
field_columns,
})
}
// Creates aggregate expressions for use in a read_group plan, which
// includes the time column.
//
// Equivalent SQL would look like this:
//
// agg_function(_val1) as _value1
// ...
// agg_function(_valN) as _valueN
// agg_function(time) as time
fn agg_for_read_group(agg: Aggregate, schema: &Schema, predicate: &Predicate) -> Result<Self> {
let agg_exprs = filtered_fields_iter(schema, predicate)
.chain(schema.time_iter())
.map(|field| make_agg_expr(agg, field.name()))
.collect::<Result<Vec<_>>>()?;
let field_columns = filtered_fields_iter(schema, predicate)
.map(|field| Arc::from(field.name().as_str()))
.collect::<Vec<_>>()
.into();
Ok(Self {
agg_exprs,
field_columns,
})
}
// Creates aggregate expressions for use in a read_window_aggregate plan. No
// aggregates are created for the time column because the
// `read_window_aggregate` uses a time column calculated using window
// bounds.
//
// Equivalent SQL would look like this:
//
// agg_function(_val1) as _value1
// ...
// agg_function(_valN) as _valueN
fn agg_for_read_window_aggregate(
agg: Aggregate,
schema: &Schema,
predicate: &Predicate,
) -> Result<Self> {
let agg_exprs = filtered_fields_iter(schema, predicate)
.map(|field| make_agg_expr(agg, field.name()))
.collect::<Result<Vec<_>>>()?;
let field_columns = filtered_fields_iter(schema, predicate)
.map(|field| Arc::from(field.name().as_str()))
.collect::<Vec<_>>()
.into();
Ok(Self {
agg_exprs,
field_columns,
})
}
}
/// Creates a DataFusion expression suitable for calculating an aggregate:

View File

@ -1,5 +1,5 @@
//! Tests for the Influx gRPC queries
use crate::scenarios::*;
use crate::scenarios::{util::all_scenarios_for_one_chunk, *};
use server::{db::test_helpers::write_lp, utils::make_db};
@ -272,3 +272,106 @@ async fn test_read_window_aggregate_months() {
)
.await;
}
// Test data to validate fix for:
// https://github.com/influxdata/influxdb_iox/issues/2697
struct MeasurementForDefect2697 {}
#[async_trait]
impl DbSetup for MeasurementForDefect2697 {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "2021-01-01T00";
let lp = vec![
"mm,section=1a bar=5.0 1609459201000000011",
"mm,section=1a bar=0.28 1609459201000000031",
"mm,section=2b bar=4.0 1609459201000000009",
"mm,section=2b bar=6.0 1609459201000000015",
"mm,section=2b bar=1.2 1609459201000000022",
"mm,section=1a foo=1.0 1609459201000000001",
"mm,section=1a foo=3.0 1609459201000000005",
"mm,section=1a foo=11.24 1609459201000000024",
"mm,section=2b foo=2.0 1609459201000000002",
];
all_scenarios_for_one_chunk(vec![], vec![], lp, "mm", partition_key).await
}
}
// See https://github.com/influxdata/influxdb_iox/issues/2697
#[tokio::test]
async fn test_grouped_series_set_plan_group_aggregate_min_defect_2697() {
let predicate = PredicateBuilder::default()
// time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z'
.timestamp_range(1609459201000000001, 1609459201000000031)
.build();
let agg = Aggregate::Min;
let every = WindowDuration::from_nanoseconds(10);
let offset = WindowDuration::from_nanoseconds(0);
// Because the windowed aggregate is using a selector aggregate (one of MIN,
// MAX, FIRST, LAST) we need to run a plan that brings along the timestamps
// for the chosen aggregate in the window.
//
// The window is defined by the `time` column
let expected_results = vec![
"+---------+--------------------------------+-----+--------------------------------+-------+--------------------------------+",
"| section | time | bar | time_bar | foo | time_foo |",
"+---------+--------------------------------+-----+--------------------------------+-------+--------------------------------+",
"| 1a | 2021-01-01T00:00:01.000000010Z | | | 1 | 2021-01-01T00:00:01.000000001Z |",
"| 1a | 2021-01-01T00:00:01.000000020Z | 5 | 2021-01-01T00:00:01.000000011Z | | |",
"| 1a | 2021-01-01T00:00:01.000000030Z | | | 11.24 | 2021-01-01T00:00:01.000000024Z |",
"| 2b | 2021-01-01T00:00:01.000000010Z | 4 | 2021-01-01T00:00:01.000000009Z | 2 | 2021-01-01T00:00:01.000000002Z |",
"| 2b | 2021-01-01T00:00:01.000000020Z | 6 | 2021-01-01T00:00:01.000000015Z | | |",
"| 2b | 2021-01-01T00:00:01.000000030Z | 1.2 | 2021-01-01T00:00:01.000000022Z | | |",
"+---------+--------------------------------+-----+--------------------------------+-------+--------------------------------+",
];
run_read_window_aggregate_test_case(
MeasurementForDefect2697 {},
predicate,
agg,
every,
offset,
expected_results,
)
.await;
}
// See https://github.com/influxdata/influxdb_iox/issues/2697
#[tokio::test]
async fn test_grouped_series_set_plan_group_aggregate_sum_defect_2697() {
let predicate = PredicateBuilder::default()
// time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z'
.timestamp_range(1609459201000000001, 1609459201000000031)
.build();
let agg = Aggregate::Sum;
let every = WindowDuration::from_nanoseconds(10);
let offset = WindowDuration::from_nanoseconds(0);
// The windowed aggregate is using a non-selector aggregate (SUM, COUNT, MEAD).
// For each distinct series the window defines the `time` column
let expected_results = vec![
"+---------+--------------------------------+-----+-------+",
"| section | time | bar | foo |",
"+---------+--------------------------------+-----+-------+",
"| 1a | 2021-01-01T00:00:01.000000010Z | | 4 |",
"| 1a | 2021-01-01T00:00:01.000000020Z | 5 | |",
"| 1a | 2021-01-01T00:00:01.000000030Z | | 11.24 |",
"| 2b | 2021-01-01T00:00:01.000000010Z | 4 | 2 |",
"| 2b | 2021-01-01T00:00:01.000000020Z | 6 | |",
"| 2b | 2021-01-01T00:00:01.000000030Z | 1.2 | |",
"+---------+--------------------------------+-----+-------+",
];
run_read_window_aggregate_test_case(
MeasurementForDefect2697 {},
predicate,
agg,
every,
offset,
expected_results,
)
.await;
}