diff --git a/iox_query/src/exec/gapfill/algo/interpolate.rs b/iox_query/src/exec/gapfill/algo/interpolate.rs index 79955012f8..2ed1d49379 100644 --- a/iox_query/src/exec/gapfill/algo/interpolate.rs +++ b/iox_query/src/exec/gapfill/algo/interpolate.rs @@ -90,7 +90,6 @@ impl Cursor { .map(|seg| Segment::::try_from(seg.clone())) .transpose()?; let mut builder = InterpolateBuilder { - params, values: Vec::with_capacity(self.remaining_output_batch_size), segment, input_time_array, @@ -173,7 +172,6 @@ impl_from_segment_scalar_value!(f64); /// Implements [`VecBuilder`] for build aggregate columns whose gaps /// are being filled using linear interpolation. pub(super) struct InterpolateBuilder<'a, T: ArrowPrimitiveType> { - pub params: &'a GapFillParams, pub values: Vec>, pub segment: Option>, pub input_time_array: &'a TimestampNanosecondArray, @@ -193,27 +191,25 @@ where offset, series_end_offset, } => { - // If - // we are not at the last point - // and the distance to the next point is greater than the stride - // and both this point and the next are not null - // then create a segment that will be used to fill in the missing rows. - if offset + 1 < series_end_offset - && self.input_time_array.value(offset + 1) > ts + self.params.stride - && self.input_aggr_array.is_valid(offset) - && self.input_aggr_array.is_valid(offset + 1) - { - self.segment = Some(Segment { + if self.input_aggr_array.is_valid(offset) { + let end_offset = self.find_end_offset(offset, series_end_offset); + // Find the next non-null value in this column for the series. + // If there is one, start a new segment at the current value. + self.segment = end_offset.map(|end_offset| Segment { start_point: (ts, self.input_aggr_array.value(offset)), end_point: ( - self.input_time_array.value(offset + 1), - self.input_aggr_array.value(offset + 1), + self.input_time_array.value(end_offset), + self.input_aggr_array.value(end_offset), ), - }) + }); + self.copy_point(offset); } else { - self.segment = None; + self.values.push( + self.segment + .as_ref() + .map(|seg| T::Native::interpolate(seg, ts)), + ); } - self.copy_point(offset); } RowStatus::Missing { ts, .. } => self.values.push( self.segment @@ -243,6 +239,17 @@ where .then_some(self.input_aggr_array.value(offset)); self.values.push(v) } + + /// Scan forward to find the endpoint for a segment that starts at `start_offset`. + /// Skip over any null values. + /// + /// We are guaranteed to have buffered enough input to find the next non-null point for this series, + /// if there is one, by the logic in [`BufferedInput`]. + /// + /// [`BufferedInput`]: super::super::buffered_input::BufferedInput + fn find_end_offset(&self, start_offset: usize, series_end_offset: usize) -> Option { + ((start_offset + 1)..series_end_offset).find(|&i| self.input_aggr_array.is_valid(i)) + } } /// A trait for the native numeric types that can be interpolated @@ -375,8 +382,8 @@ mod test { - "| 1970-01-01T00:00:00.000001200Z | 133 |" - "| 1970-01-01T00:00:00.000001300Z | 166 |" - "| 1970-01-01T00:00:00.000001400Z | 200 |" - - "| 1970-01-01T00:00:00.000001500Z | |" - - "| 1970-01-01T00:00:00.000001600Z | |" + - "| 1970-01-01T00:00:00.000001500Z | 466 |" + - "| 1970-01-01T00:00:00.000001600Z | 733 |" - "| 1970-01-01T00:00:00.000001700Z | 1000 |" - "| 1970-01-01T00:00:00.000001800Z | 500 |" - "| 1970-01-01T00:00:00.000001900Z | 0 |" @@ -447,8 +454,8 @@ mod test { - "| 1970-01-01T00:00:00.000001200Z | 133 |" - "| 1970-01-01T00:00:00.000001300Z | 166 |" - "| 1970-01-01T00:00:00.000001400Z | 200 |" - - "| 1970-01-01T00:00:00.000001500Z | |" - - "| 1970-01-01T00:00:00.000001600Z | |" + - "| 1970-01-01T00:00:00.000001500Z | 466 |" + - "| 1970-01-01T00:00:00.000001600Z | 733 |" - "| 1970-01-01T00:00:00.000001700Z | 1000 |" - "| 1970-01-01T00:00:00.000001800Z | 500 |" - "| 1970-01-01T00:00:00.000001900Z | 0 |" @@ -519,8 +526,8 @@ mod test { - "| 1970-01-01T00:00:00.000001200Z | 200.0 |" - "| 1970-01-01T00:00:00.000001300Z | 300.0 |" - "| 1970-01-01T00:00:00.000001400Z | 400.0 |" - - "| 1970-01-01T00:00:00.000001500Z | |" - - "| 1970-01-01T00:00:00.000001600Z | |" + - "| 1970-01-01T00:00:00.000001500Z | 600.0 |" + - "| 1970-01-01T00:00:00.000001600Z | 800.0 |" - "| 1970-01-01T00:00:00.000001700Z | 1000.0 |" - "| 1970-01-01T00:00:00.000001800Z | 500.0 |" - "| 1970-01-01T00:00:00.000001900Z | 0.0 |" diff --git a/iox_query/src/exec/gapfill/exec_tests.rs b/iox_query/src/exec/gapfill/exec_tests.rs index e33db776ff..bcb674831f 100644 --- a/iox_query/src/exec/gapfill/exec_tests.rs +++ b/iox_query/src/exec/gapfill/exec_tests.rs @@ -775,6 +775,7 @@ fn test_gapfill_fill_interpolate() { Some("b"), Some("b"), Some("b"), + Some("b"), ]], time_col: vec![ None, @@ -788,7 +789,7 @@ fn test_gapfill_fill_interpolate() { // --- new series None, Some(975), - // 1000 + Some(1000), Some(1025), // 1050 Some(1075), @@ -807,7 +808,7 @@ fn test_gapfill_fill_interpolate() { // --- new series Some(-10), Some(1100), // 975 - // 1200 1000 + None, // 1200 1000 (this null value will be filled) Some(1300), // 1025 // 1325 1050 Some(1350), // 1075