fix: make interpolate() fill null values in input (#7490)
* fix: make interpolate() fill null values in input * chore: cargo docpull/24376/head
parent
3e60369eff
commit
0937615dba
|
@ -90,7 +90,6 @@ impl Cursor {
|
|||
.map(|seg| Segment::<T::Native>::try_from(seg.clone()))
|
||||
.transpose()?;
|
||||
let mut builder = InterpolateBuilder {
|
||||
params,
|
||||
values: Vec::with_capacity(self.remaining_output_batch_size),
|
||||
segment,
|
||||
input_time_array,
|
||||
|
@ -173,7 +172,6 @@ impl_from_segment_scalar_value!(f64);
|
|||
/// Implements [`VecBuilder`] for build aggregate columns whose gaps
|
||||
/// are being filled using linear interpolation.
|
||||
pub(super) struct InterpolateBuilder<'a, T: ArrowPrimitiveType> {
|
||||
pub params: &'a GapFillParams,
|
||||
pub values: Vec<Option<T::Native>>,
|
||||
pub segment: Option<Segment<T::Native>>,
|
||||
pub input_time_array: &'a TimestampNanosecondArray,
|
||||
|
@ -193,27 +191,25 @@ where
|
|||
offset,
|
||||
series_end_offset,
|
||||
} => {
|
||||
// If
|
||||
// we are not at the last point
|
||||
// and the distance to the next point is greater than the stride
|
||||
// and both this point and the next are not null
|
||||
// then create a segment that will be used to fill in the missing rows.
|
||||
if offset + 1 < series_end_offset
|
||||
&& self.input_time_array.value(offset + 1) > ts + self.params.stride
|
||||
&& self.input_aggr_array.is_valid(offset)
|
||||
&& self.input_aggr_array.is_valid(offset + 1)
|
||||
{
|
||||
self.segment = Some(Segment {
|
||||
if self.input_aggr_array.is_valid(offset) {
|
||||
let end_offset = self.find_end_offset(offset, series_end_offset);
|
||||
// Find the next non-null value in this column for the series.
|
||||
// If there is one, start a new segment at the current value.
|
||||
self.segment = end_offset.map(|end_offset| Segment {
|
||||
start_point: (ts, self.input_aggr_array.value(offset)),
|
||||
end_point: (
|
||||
self.input_time_array.value(offset + 1),
|
||||
self.input_aggr_array.value(offset + 1),
|
||||
self.input_time_array.value(end_offset),
|
||||
self.input_aggr_array.value(end_offset),
|
||||
),
|
||||
})
|
||||
});
|
||||
self.copy_point(offset);
|
||||
} else {
|
||||
self.segment = None;
|
||||
self.values.push(
|
||||
self.segment
|
||||
.as_ref()
|
||||
.map(|seg| T::Native::interpolate(seg, ts)),
|
||||
);
|
||||
}
|
||||
self.copy_point(offset);
|
||||
}
|
||||
RowStatus::Missing { ts, .. } => self.values.push(
|
||||
self.segment
|
||||
|
@ -243,6 +239,17 @@ where
|
|||
.then_some(self.input_aggr_array.value(offset));
|
||||
self.values.push(v)
|
||||
}
|
||||
|
||||
/// Scan forward to find the endpoint for a segment that starts at `start_offset`.
|
||||
/// Skip over any null values.
|
||||
///
|
||||
/// We are guaranteed to have buffered enough input to find the next non-null point for this series,
|
||||
/// if there is one, by the logic in [`BufferedInput`].
|
||||
///
|
||||
/// [`BufferedInput`]: super::super::buffered_input::BufferedInput
|
||||
fn find_end_offset(&self, start_offset: usize, series_end_offset: usize) -> Option<usize> {
|
||||
((start_offset + 1)..series_end_offset).find(|&i| self.input_aggr_array.is_valid(i))
|
||||
}
|
||||
}
|
||||
|
||||
/// A trait for the native numeric types that can be interpolated
|
||||
|
@ -375,8 +382,8 @@ mod test {
|
|||
- "| 1970-01-01T00:00:00.000001200Z | 133 |"
|
||||
- "| 1970-01-01T00:00:00.000001300Z | 166 |"
|
||||
- "| 1970-01-01T00:00:00.000001400Z | 200 |"
|
||||
- "| 1970-01-01T00:00:00.000001500Z | |"
|
||||
- "| 1970-01-01T00:00:00.000001600Z | |"
|
||||
- "| 1970-01-01T00:00:00.000001500Z | 466 |"
|
||||
- "| 1970-01-01T00:00:00.000001600Z | 733 |"
|
||||
- "| 1970-01-01T00:00:00.000001700Z | 1000 |"
|
||||
- "| 1970-01-01T00:00:00.000001800Z | 500 |"
|
||||
- "| 1970-01-01T00:00:00.000001900Z | 0 |"
|
||||
|
@ -447,8 +454,8 @@ mod test {
|
|||
- "| 1970-01-01T00:00:00.000001200Z | 133 |"
|
||||
- "| 1970-01-01T00:00:00.000001300Z | 166 |"
|
||||
- "| 1970-01-01T00:00:00.000001400Z | 200 |"
|
||||
- "| 1970-01-01T00:00:00.000001500Z | |"
|
||||
- "| 1970-01-01T00:00:00.000001600Z | |"
|
||||
- "| 1970-01-01T00:00:00.000001500Z | 466 |"
|
||||
- "| 1970-01-01T00:00:00.000001600Z | 733 |"
|
||||
- "| 1970-01-01T00:00:00.000001700Z | 1000 |"
|
||||
- "| 1970-01-01T00:00:00.000001800Z | 500 |"
|
||||
- "| 1970-01-01T00:00:00.000001900Z | 0 |"
|
||||
|
@ -519,8 +526,8 @@ mod test {
|
|||
- "| 1970-01-01T00:00:00.000001200Z | 200.0 |"
|
||||
- "| 1970-01-01T00:00:00.000001300Z | 300.0 |"
|
||||
- "| 1970-01-01T00:00:00.000001400Z | 400.0 |"
|
||||
- "| 1970-01-01T00:00:00.000001500Z | |"
|
||||
- "| 1970-01-01T00:00:00.000001600Z | |"
|
||||
- "| 1970-01-01T00:00:00.000001500Z | 600.0 |"
|
||||
- "| 1970-01-01T00:00:00.000001600Z | 800.0 |"
|
||||
- "| 1970-01-01T00:00:00.000001700Z | 1000.0 |"
|
||||
- "| 1970-01-01T00:00:00.000001800Z | 500.0 |"
|
||||
- "| 1970-01-01T00:00:00.000001900Z | 0.0 |"
|
||||
|
|
|
@ -775,6 +775,7 @@ fn test_gapfill_fill_interpolate() {
|
|||
Some("b"),
|
||||
Some("b"),
|
||||
Some("b"),
|
||||
Some("b"),
|
||||
]],
|
||||
time_col: vec![
|
||||
None,
|
||||
|
@ -788,7 +789,7 @@ fn test_gapfill_fill_interpolate() {
|
|||
// --- new series
|
||||
None,
|
||||
Some(975),
|
||||
// 1000
|
||||
Some(1000),
|
||||
Some(1025),
|
||||
// 1050
|
||||
Some(1075),
|
||||
|
@ -807,7 +808,7 @@ fn test_gapfill_fill_interpolate() {
|
|||
// --- new series
|
||||
Some(-10),
|
||||
Some(1100), // 975
|
||||
// 1200 1000
|
||||
None, // 1200 1000 (this null value will be filled)
|
||||
Some(1300), // 1025
|
||||
// 1325 1050
|
||||
Some(1350), // 1075
|
||||
|
|
Loading…
Reference in New Issue