From 5e869899900f7953ab81286244ac560954429f35 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 1 Nov 2021 14:54:43 -0400 Subject: [PATCH] =?UTF-8?q?fix:=20Only=20use=20timestamps=20in=20first/las?= =?UTF-8?q?t=20when=20there=20is=20a=20corresponding=20=E2=80=A6=20(#2988)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: Only use timestamps in first/last when there is a corresponding value * docs: Fix broken English in comment * docs: clarify expectations in test --- query/src/func/selectors/internal.rs | 38 ++++++++++++++ query_tests/src/influxrpc/read_group.rs | 68 ++++++++++++++++++++++++- 2 files changed, 105 insertions(+), 1 deletion(-) diff --git a/query/src/func/selectors/internal.rs b/query/src/func/selectors/internal.rs index b38578d5bc..9afeb2fa5c 100644 --- a/query/src/func/selectors/internal.rs +++ b/query/src/func/selectors/internal.rs @@ -147,6 +147,25 @@ macro_rules! make_first_selector { // the input type arguments should be ensured by datafusion .expect("Second argument was time"); + // Only look for times where the array also has a non + // null value (the time array should have no nulls itself) + // + // For example, for the following input, the correct + // current min time is 200 (not 100) + // + // value | time + // -------------- + // NULL | 100 + // A | 200 + // B | 300 + // + // Note this could likely be faster if we used `ArrayData` APIs + let time_arr: TimestampNanosecondArray = time_arr + .iter() + .zip(value_arr.iter()) + .map(|(ts, value)| if value.is_some() { ts } else { None }) + .collect(); + let cur_min_time = $MINFUNC(&time_arr); let need_update = match (&self.time, &cur_min_time) { @@ -235,6 +254,25 @@ macro_rules! make_last_selector { // the input type arguments should be ensured by datafusion .expect("Second argument was time"); + // Only look for times where the array also has a non + // null value (the time array should have no nulls itself) + // + // For example, for the following input, the correct + // current max time is 200 (not 300) + // + // value | time + // -------------- + // A | 100 + // B | 200 + // NULL | 300 + // + // Note this could likely be faster if we used `ArrayData` APIs + let time_arr: TimestampNanosecondArray = time_arr + .iter() + .zip(value_arr.iter()) + .map(|(ts, value)| if value.is_some() { ts } else { None }) + .collect(); + let cur_max_time = $MAXFUNC(&time_arr); let need_update = match (&self.time, &cur_max_time) { diff --git a/query_tests/src/influxrpc/read_group.rs b/query_tests/src/influxrpc/read_group.rs index 2b287fd47a..89f59f1e8d 100644 --- a/query_tests/src/influxrpc/read_group.rs +++ b/query_tests/src/influxrpc/read_group.rs @@ -3,7 +3,7 @@ use crate::{ influxrpc::util::run_series_set_plan, scenarios::{ make_two_chunk_scenarios, util::all_scenarios_for_one_chunk, DbScenario, DbSetup, NoData, - TwoMeasurementsManyFieldsOneChunk, + TwoMeasurementsManyFields, TwoMeasurementsManyFieldsOneChunk, }, }; @@ -540,6 +540,39 @@ async fn test_grouped_series_set_plan_first() { .await; } +#[tokio::test] +async fn test_grouped_series_set_plan_first_with_nulls() { + let predicate = PredicateBuilder::default() + // return three rows, but one series + // "h2o,state=MA,city=Boston temp=70.4 50", + // "h2o,state=MA,city=Boston other_temp=70.4 250", + // "h2o,state=MA,city=Boston temp=70.4,moisture=43.0 100000" + .table("h2o") + .add_expr(col("state").eq(lit("MA"))) + .add_expr(col("city").eq(lit("Boston"))) + .build(); + + let agg = Aggregate::First; + let group_columns = vec!["state"]; + + // expect timestamps to be present for all three series + let expected_results = vec![ + "Group tag_keys: _field, _measurement, city, state partition_key_vals: MA", + "Series tags={_field=moisture, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [100000], values: [43.0]", + "Series tags={_field=other_temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [70.4]", + "Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [50], values: [70.4]", + ]; + + run_read_group_test_case( + TwoMeasurementsManyFields {}, + predicate, + agg, + group_columns, + expected_results, + ) + .await; +} + #[tokio::test] async fn test_grouped_series_set_plan_last() { let predicate = PredicateBuilder::default() @@ -568,6 +601,39 @@ async fn test_grouped_series_set_plan_last() { .await; } +#[tokio::test] +async fn test_grouped_series_set_plan_last_with_nulls() { + let predicate = PredicateBuilder::default() + // return two three: + // "h2o,state=MA,city=Boston temp=70.4 50", + // "h2o,state=MA,city=Boston other_temp=70.4 250", + // "h2o,state=MA,city=Boston temp=70.4,moisture=43.0 100000" + .table("h2o") + .add_expr(col("state").eq(lit("MA"))) + .add_expr(col("city").eq(lit("Boston"))) + .build(); + + let agg = Aggregate::Last; + let group_columns = vec!["state"]; + + // expect timestamps to be present for all three series + let expected_results = vec![ + "Group tag_keys: _field, _measurement, city, state partition_key_vals: MA", + "Series tags={_field=moisture, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [100000], values: [43.0]", + "Series tags={_field=other_temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [70.4]", + "Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [100000], values: [70.4]", + ]; + + run_read_group_test_case( + TwoMeasurementsManyFields {}, + predicate, + agg, + group_columns, + expected_results, + ) + .await; +} + struct MeasurementForMin {} #[async_trait] impl DbSetup for MeasurementForMin {