refactor: make null-as-missing default behavior for LOCF (#7443)

* refactor: make null-as-missing default behavior for LOCF

* test: update InfluxQL test

---------

Co-authored-by: Christopher Wolff <cwolff@athena.tail244ec.ts.net>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Christopher M. Wolff 2023-04-04 11:03:09 -07:00 committed by GitHub
parent 099b33871b
commit d57a4f8947
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 65 additions and 27 deletions

View File

@ -66,3 +66,12 @@ from cpu
where time between timestamp '2000-05-05T12:15:00Z' and timestamp '2000-05-05T12:59:00Z'
group by region, minute;
-- cpu.idle has a null value at 12:31. It should propagate the value from 12:20 forward,
-- overwriting the null value.
SELECT
date_bin_gapfill(interval '1 minute', time, timestamp '1970-01-01T00:00:00Z') as minute,
locf(min(cpu.idle))
from cpu
where time between timestamp '2000-05-05T12:19:00Z' and timestamp '2000-05-05T12:40:00Z'
group by minute;

View File

@ -19,7 +19,7 @@
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@0 >= 957528000000000000 AND time@0 <= 957531540000000000 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time@1 >= 957528000000000000 AND time@1 <= 957531540000000000, pruning_predicate=time_max@0 >= 957528000000000000 AND time_min@1 <= 957531540000000000, output_ordering=[time@0 ASC], projection=[time, user] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time@2 >= 957528000000000000 AND time@2 <= 957531540000000000, pruning_predicate=time_max@0 >= 957528000000000000 AND time_min@1 <= 957531540000000000, output_ordering=[time@0 ASC], projection=[time, user] |
| | |
----------
-- SQL: SELECT date_bin_gapfill(interval '10 minute', time, timestamp '1970-01-01T00:00:00Z') as minute, count(cpu.user) from cpu where time between timestamp '2000-05-05T12:00:00Z' and timestamp '2000-05-05T12:59:00Z' group by minute;
@ -100,7 +100,7 @@
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@1 >= 957528000000000000 AND time@1 <= 957531540000000000 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time@1 >= 957528000000000000 AND time@1 <= 957531540000000000, pruning_predicate=time_max@0 >= 957528000000000000 AND time_min@1 <= 957531540000000000, output_ordering=[region@0 ASC, time@1 ASC], projection=[region, time, user] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time@2 >= 957528000000000000 AND time@2 <= 957531540000000000, pruning_predicate=time_max@0 >= 957528000000000000 AND time_min@1 <= 957531540000000000, output_ordering=[region@0 ASC, time@1 ASC], projection=[region, time, user] |
| | |
----------
-- SQL: SELECT region, date_bin_gapfill(interval '5 minute', time, timestamp '1970-01-01T00:00:00Z') as minute, locf(min(cpu.user)) from cpu where time between timestamp '2000-05-05T12:15:00Z' and timestamp '2000-05-05T12:59:00Z' group by region, minute;
@ -125,4 +125,31 @@
| b | 2000-05-05T12:45:00Z | 28.9 |
| b | 2000-05-05T12:50:00Z | 28.9 |
| b | 2000-05-05T12:55:00Z | 28.9 |
+--------+----------------------+---------------+
+--------+----------------------+---------------+
-- SQL: SELECT date_bin_gapfill(interval '1 minute', time, timestamp '1970-01-01T00:00:00Z') as minute, locf(min(cpu.idle)) from cpu where time between timestamp '2000-05-05T12:19:00Z' and timestamp '2000-05-05T12:40:00Z' group by minute;
+----------------------+---------------+
| minute | MIN(cpu.idle) |
+----------------------+---------------+
| 2000-05-05T12:19:00Z | |
| 2000-05-05T12:20:00Z | 70.0 |
| 2000-05-05T12:21:00Z | 70.0 |
| 2000-05-05T12:22:00Z | 70.0 |
| 2000-05-05T12:23:00Z | 70.0 |
| 2000-05-05T12:24:00Z | 70.0 |
| 2000-05-05T12:25:00Z | 70.0 |
| 2000-05-05T12:26:00Z | 70.0 |
| 2000-05-05T12:27:00Z | 70.0 |
| 2000-05-05T12:28:00Z | 70.0 |
| 2000-05-05T12:29:00Z | 70.0 |
| 2000-05-05T12:30:00Z | 70.0 |
| 2000-05-05T12:31:00Z | 70.0 |
| 2000-05-05T12:32:00Z | 70.0 |
| 2000-05-05T12:33:00Z | 70.0 |
| 2000-05-05T12:34:00Z | 70.0 |
| 2000-05-05T12:35:00Z | 70.0 |
| 2000-05-05T12:36:00Z | 70.0 |
| 2000-05-05T12:37:00Z | 70.0 |
| 2000-05-05T12:38:00Z | 70.0 |
| 2000-05-05T12:39:00Z | 60.0 |
| 2000-05-05T12:40:00Z | 60.0 |
+----------------------+---------------+

View File

@ -543,10 +543,10 @@ pub static SETUPS: Lazy<HashMap<SetupName, SetupSteps>> = Lazy::new(|| {
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
[
"cpu,region=a user=23.2 957529200000000000", // 2000-05-05T12:20:00Z
"cpu,region=a user=21.0 957530400000000000", // 2000-05-05T12:40:00Z
"cpu,region=b user=25.2 957529860000000000", // 2000-05-05T12:31:00Z
"cpu,region=b user=28.9 957530340000000000", // 2000-05-05T12:39:00Z
"cpu,region=a user=23.2,idle=70.0 957529200000000000", // 2000-05-05T12:20:00Z
"cpu,region=b user=25.2 957529860000000000", // 2000-05-05T12:31:00Z
"cpu,region=b user=28.9,idle=60.0 957530340000000000", // 2000-05-05T12:39:00Z
"cpu,region=a user=21.0 957530400000000000", // 2000-05-05T12:40:00Z
]
.join("\n"),
),

View File

@ -520,7 +520,7 @@ impl Cursor {
AggrColState::Null => {
self.build_aggr_fill_null(params, series_ends, input_time_array, input_aggr_array)
}
AggrColState::Prev { .. } | AggrColState::PrevNullAsMissing { .. } => {
AggrColState::PrevNullAsIntentional { .. } | AggrColState::PrevNullAsMissing { .. } => {
self.build_aggr_fill_prev(params, series_ends, input_time_array, input_aggr_array)
}
AggrColState::PrevNullAsMissingStashed { .. } => self.build_aggr_fill_prev_stashed(
@ -633,7 +633,7 @@ impl Cursor {
..
} = aggr_builder;
self.set_aggr_col_state(match null_as_missing {
false => AggrColState::Prev {
false => AggrColState::PrevNullAsIntentional {
offset: prev_offset,
},
true => AggrColState::PrevNullAsMissing {
@ -801,8 +801,8 @@ impl Cursor {
enum AggrColState {
/// For [FillStrategy::Null] there is no state to maintain.
Null,
/// For [FillStrategy::Prev].
Prev { offset: Option<u64> },
/// For [FillStrategy::PrevNullAsIntentional].
PrevNullAsIntentional { offset: Option<u64> },
/// For [FillStrategy::PrevNullAsMissing].
PrevNullAsMissing { offset: Option<u64> },
/// For [FillStrategy::PrevNullAsMissing], when
@ -823,7 +823,7 @@ impl AggrColState {
fn new(fill_strategy: &FillStrategy) -> Self {
match fill_strategy {
FillStrategy::Null => Self::Null,
FillStrategy::Prev => Self::Prev { offset: None },
FillStrategy::PrevNullAsIntentional => Self::PrevNullAsIntentional { offset: None },
FillStrategy::PrevNullAsMissing => Self::PrevNullAsMissing { offset: None },
FillStrategy::LinearInterpolate => Self::LinearInterpolate(None),
}
@ -833,11 +833,11 @@ impl AggrColState {
///
/// # Panics
///
/// This method will panic if `self` is not [AggrColState::Prev]
/// This method will panic if `self` is not [AggrColState::PrevNullAsIntentional]
/// or [AggrColState::PrevNullAsMissing].
fn prev_offset(&self) -> Option<u64> {
match self {
Self::Prev { offset } | Self::PrevNullAsMissing { offset } => *offset,
Self::PrevNullAsIntentional { offset } | Self::PrevNullAsMissing { offset } => *offset,
_ => unreachable!(),
}
}
@ -853,9 +853,8 @@ impl AggrColState {
let stash = StashedAggrBuilder::create_stash(array, *v)?;
*self = Self::PrevNullAsMissingStashed { stash };
}
Self::Prev { offset: Some(v) } | Self::PrevNullAsMissing { offset: Some(v) } => {
*v -= offset
}
Self::PrevNullAsIntentional { offset: Some(v) }
| Self::PrevNullAsMissing { offset: Some(v) } => *v -= offset,
_ => (),
};
Ok(())
@ -1596,7 +1595,7 @@ mod tests {
}
fn prev_fill_strategy(idx: usize) -> HashMap<usize, FillStrategy> {
std::iter::once((idx, FillStrategy::Prev)).collect()
std::iter::once((idx, FillStrategy::PrevNullAsIntentional)).collect()
}
fn prev_null_as_missing_fill_strategy(idx: usize) -> HashMap<usize, FillStrategy> {

View File

@ -535,7 +535,7 @@ fn test_gapfill_fill_prev() {
]],
input_batch_size,
};
let params = get_params_ms_with_fill_strategy(&records, 25, Some(975), 1_125, FillStrategy::Prev);
let params = get_params_ms_with_fill_strategy(&records, 25, Some(975), 1_125, FillStrategy::PrevNullAsIntentional);
let tc = TestCase {
test_records: records,
output_batch_size,

View File

@ -76,10 +76,11 @@ pub enum FillStrategy {
/// This is the InfluxQL behavior for `FILL(NULL)` or `FILL(NONE)`.
Null,
/// Fill with the most recent value in the input column.
Prev,
/// Null values in the input are preserved.
#[allow(dead_code)]
PrevNullAsIntentional,
/// Fill with the most recent non-null value in the input column.
/// This is the InfluxQL behavior for `FILL(PREVIOUS)`.
#[allow(dead_code)]
PrevNullAsMissing,
/// Fill the gaps between points linearly.
/// Null values will not be considered as missing, so two non-null values
@ -217,8 +218,8 @@ impl UserDefinedLogicalNodeCore for GapFill {
.fill_strategy
.iter()
.map(|(e, fs)| match fs {
FillStrategy::Prev => format!("LOCF({})", e),
FillStrategy::PrevNullAsMissing => format!("LOCF(null-as-missing, {})", e),
FillStrategy::PrevNullAsIntentional => format!("LOCF(null-as-intentional, {})", e),
FillStrategy::PrevNullAsMissing => format!("LOCF({})", e),
FillStrategy::LinearInterpolate => format!("INTERPOLATE({})", e),
FillStrategy::Null => e.to_string(),
})
@ -536,8 +537,10 @@ impl ExecutionPlan for GapFillExec {
.fill_strategy
.iter()
.map(|(e, fs)| match fs {
FillStrategy::Prev => format!("LOCF({})", e),
FillStrategy::PrevNullAsMissing => format!("LOCF(null-as-missing, {})", e),
FillStrategy::PrevNullAsIntentional => {
format!("LOCF(null-as-intentional, {})", e)
}
FillStrategy::PrevNullAsMissing => format!("LOCF({})", e),
FillStrategy::LinearInterpolate => format!("INTERPOLATE({})", e),
FillStrategy::Null => e.to_string(),
})

View File

@ -370,7 +370,7 @@ fn handle_projection(proj: &Projection) -> Result<Option<LogicalPlan>> {
.filter_map(|e| match e {
Expr::ScalarUDF { fun, args } if fun.name == LOCF_UDF_NAME => {
let col = &args[0];
Some((col, FillStrategy::Prev))
Some((col, FillStrategy::PrevNullAsMissing))
}
_ => None,
})

View File

@ -2081,7 +2081,7 @@ mod test {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(previous)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[LOCF(null-as-missing, COUNT(data.f64_field))]], time_column=time, stride=IntervalDayTime("10000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
GapFill: groupBy=[[time]], aggr=[[LOCF(COUNT(data.f64_field))]], time_column=time, stride=IntervalDayTime("10000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);