fix: Only use timestamps in first/last when there is a corresponding … (#2988)
* fix: Only use timestamps in first/last when there is a corresponding value * docs: Fix broken English in comment * docs: clarify expectations in testpull/24376/head
parent
6473498cd5
commit
5e86989990
|
@ -147,6 +147,25 @@ macro_rules! make_first_selector {
|
|||
// the input type arguments should be ensured by datafusion
|
||||
.expect("Second argument was time");
|
||||
|
||||
// Only look for times where the array also has a non
|
||||
// null value (the time array should have no nulls itself)
|
||||
//
|
||||
// For example, for the following input, the correct
|
||||
// current min time is 200 (not 100)
|
||||
//
|
||||
// value | time
|
||||
// --------------
|
||||
// NULL | 100
|
||||
// A | 200
|
||||
// B | 300
|
||||
//
|
||||
// Note this could likely be faster if we used `ArrayData` APIs
|
||||
let time_arr: TimestampNanosecondArray = time_arr
|
||||
.iter()
|
||||
.zip(value_arr.iter())
|
||||
.map(|(ts, value)| if value.is_some() { ts } else { None })
|
||||
.collect();
|
||||
|
||||
let cur_min_time = $MINFUNC(&time_arr);
|
||||
|
||||
let need_update = match (&self.time, &cur_min_time) {
|
||||
|
@ -235,6 +254,25 @@ macro_rules! make_last_selector {
|
|||
// the input type arguments should be ensured by datafusion
|
||||
.expect("Second argument was time");
|
||||
|
||||
// Only look for times where the array also has a non
|
||||
// null value (the time array should have no nulls itself)
|
||||
//
|
||||
// For example, for the following input, the correct
|
||||
// current max time is 200 (not 300)
|
||||
//
|
||||
// value | time
|
||||
// --------------
|
||||
// A | 100
|
||||
// B | 200
|
||||
// NULL | 300
|
||||
//
|
||||
// Note this could likely be faster if we used `ArrayData` APIs
|
||||
let time_arr: TimestampNanosecondArray = time_arr
|
||||
.iter()
|
||||
.zip(value_arr.iter())
|
||||
.map(|(ts, value)| if value.is_some() { ts } else { None })
|
||||
.collect();
|
||||
|
||||
let cur_max_time = $MAXFUNC(&time_arr);
|
||||
|
||||
let need_update = match (&self.time, &cur_max_time) {
|
||||
|
|
|
@ -3,7 +3,7 @@ use crate::{
|
|||
influxrpc::util::run_series_set_plan,
|
||||
scenarios::{
|
||||
make_two_chunk_scenarios, util::all_scenarios_for_one_chunk, DbScenario, DbSetup, NoData,
|
||||
TwoMeasurementsManyFieldsOneChunk,
|
||||
TwoMeasurementsManyFields, TwoMeasurementsManyFieldsOneChunk,
|
||||
},
|
||||
};
|
||||
|
||||
|
@ -540,6 +540,39 @@ async fn test_grouped_series_set_plan_first() {
|
|||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_grouped_series_set_plan_first_with_nulls() {
|
||||
let predicate = PredicateBuilder::default()
|
||||
// return three rows, but one series
|
||||
// "h2o,state=MA,city=Boston temp=70.4 50",
|
||||
// "h2o,state=MA,city=Boston other_temp=70.4 250",
|
||||
// "h2o,state=MA,city=Boston temp=70.4,moisture=43.0 100000"
|
||||
.table("h2o")
|
||||
.add_expr(col("state").eq(lit("MA")))
|
||||
.add_expr(col("city").eq(lit("Boston")))
|
||||
.build();
|
||||
|
||||
let agg = Aggregate::First;
|
||||
let group_columns = vec!["state"];
|
||||
|
||||
// expect timestamps to be present for all three series
|
||||
let expected_results = vec![
|
||||
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
|
||||
"Series tags={_field=moisture, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [100000], values: [43.0]",
|
||||
"Series tags={_field=other_temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [70.4]",
|
||||
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [50], values: [70.4]",
|
||||
];
|
||||
|
||||
run_read_group_test_case(
|
||||
TwoMeasurementsManyFields {},
|
||||
predicate,
|
||||
agg,
|
||||
group_columns,
|
||||
expected_results,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_grouped_series_set_plan_last() {
|
||||
let predicate = PredicateBuilder::default()
|
||||
|
@ -568,6 +601,39 @@ async fn test_grouped_series_set_plan_last() {
|
|||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_grouped_series_set_plan_last_with_nulls() {
|
||||
let predicate = PredicateBuilder::default()
|
||||
// return two three:
|
||||
// "h2o,state=MA,city=Boston temp=70.4 50",
|
||||
// "h2o,state=MA,city=Boston other_temp=70.4 250",
|
||||
// "h2o,state=MA,city=Boston temp=70.4,moisture=43.0 100000"
|
||||
.table("h2o")
|
||||
.add_expr(col("state").eq(lit("MA")))
|
||||
.add_expr(col("city").eq(lit("Boston")))
|
||||
.build();
|
||||
|
||||
let agg = Aggregate::Last;
|
||||
let group_columns = vec!["state"];
|
||||
|
||||
// expect timestamps to be present for all three series
|
||||
let expected_results = vec![
|
||||
"Group tag_keys: _field, _measurement, city, state partition_key_vals: MA",
|
||||
"Series tags={_field=moisture, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [100000], values: [43.0]",
|
||||
"Series tags={_field=other_temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [70.4]",
|
||||
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [100000], values: [70.4]",
|
||||
];
|
||||
|
||||
run_read_group_test_case(
|
||||
TwoMeasurementsManyFields {},
|
||||
predicate,
|
||||
agg,
|
||||
group_columns,
|
||||
expected_results,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
struct MeasurementForMin {}
|
||||
#[async_trait]
|
||||
impl DbSetup for MeasurementForMin {
|
||||
|
|
Loading…
Reference in New Issue