feat: add null-as-missing gap filling (#7245)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Christopher M. Wolff 2023-03-17 13:34:45 -07:00 committed by GitHub
parent 96c2094302
commit 866f9cefa1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 719 additions and 578 deletions

File diff suppressed because it is too large Load Diff

View File

@ -573,6 +573,185 @@ fn test_gapfill_fill_prev() {
}}
}
#[test]
fn test_gapfill_fill_prev_null_as_missing() {
test_helpers::maybe_start_logging();
insta::allow_duplicates! {
for output_batch_size in [16, 1] {
for input_batch_size in [8, 1] {
let records = TestRecords {
group_cols: vec![vec![
Some("a"),
Some("a"),
Some("b"),
Some("b"),
Some("b"),
]],
time_col: vec![
// 975
Some(1000),
// 1025
// 1050
Some(1075),
// 1100
// 1125
// --- new series
// 975
Some(1000),
// 1025
Some(1050),
// 1075
Some(1100),
// 1125
],
agg_cols: vec![vec![
Some(10), // a: 1000
None, // a: 1075
Some(20), // b: 1000
None, // b: 1050
Some(21), // b: 1100
]],
input_batch_size,
};
let params = get_params_ms_with_fill_strategy(&records, 25, Some(975), 1_125, FillStrategy::PrevNullAsMissing);
let tc = TestCase {
test_records: records,
output_batch_size,
params,
};
let batches = tc.run().unwrap();
let actual = batches_to_lines(&batches);
insta::with_settings!({
description => format!("input_batch_size: {input_batch_size}, output_batch_size: {output_batch_size}"),
}, {
insta::assert_yaml_snapshot!(actual, @r###"
---
- +----+--------------------------+----+
- "| g0 | time | a0 |"
- +----+--------------------------+----+
- "| a | 1970-01-01T00:00:00.975Z | |"
- "| a | 1970-01-01T00:00:01Z | 10 |"
- "| a | 1970-01-01T00:00:01.025Z | 10 |"
- "| a | 1970-01-01T00:00:01.050Z | 10 |"
- "| a | 1970-01-01T00:00:01.075Z | 10 |"
- "| a | 1970-01-01T00:00:01.100Z | 10 |"
- "| a | 1970-01-01T00:00:01.125Z | 10 |"
- "| b | 1970-01-01T00:00:00.975Z | |"
- "| b | 1970-01-01T00:00:01Z | 20 |"
- "| b | 1970-01-01T00:00:01.025Z | 20 |"
- "| b | 1970-01-01T00:00:01.050Z | 20 |"
- "| b | 1970-01-01T00:00:01.075Z | 20 |"
- "| b | 1970-01-01T00:00:01.100Z | 21 |"
- "| b | 1970-01-01T00:00:01.125Z | 21 |"
- +----+--------------------------+----+
"###)
});
assert_batch_count(&batches, output_batch_size);
}
}}
}
#[test]
fn test_gapfill_fill_prev_null_as_missing_many_nulls() {
test_helpers::maybe_start_logging();
insta::allow_duplicates! {
for output_batch_size in [16, 1] {
for input_batch_size in [8, 1] {
let records = TestRecords {
group_cols: vec![vec![
Some("a"),
Some("a"),
Some("a"),
Some("a"),
Some("a"),
Some("a"),
// --- new series
Some("b"),
Some("b"),
Some("b"),
Some("b"),
Some("b"),
]],
time_col: vec![
None,
Some(975),
Some(1000),
Some(1025),
Some(1050),
// 1075
Some(1100),
// 1125
// --- new series
None,
Some(975),
// 1000
Some(1025),
Some(1050),
// 1075
Some(1100),
// 1125
],
agg_cols: vec![vec![
Some(-1), // a: null ts
Some(10), // a: 975
None, // a: 1000
None, // a: 1025 (stashed)
None, // a: 1050 (stashed)
// a: 1075 (stashed)
Some(12), // a: 1100
// a: 1125
// --- new series
Some(-2), // b: null ts
None, // b: 975
// b: 1000
Some(21), // b: 1025
None, // b: 1050
// b: 1075
Some(22), // b: 1100
// b: 1125
]],
input_batch_size,
};
let params = get_params_ms_with_fill_strategy(&records, 25, Some(975), 1_125, FillStrategy::PrevNullAsMissing);
let tc = TestCase {
test_records: records,
output_batch_size,
params,
};
let batches = tc.run().unwrap();
let actual = batches_to_lines(&batches);
insta::with_settings!({
description => format!("input_batch_size: {input_batch_size}, output_batch_size: {output_batch_size}"),
}, {
insta::assert_yaml_snapshot!(actual, @r###"
---
- +----+--------------------------+----+
- "| g0 | time | a0 |"
- +----+--------------------------+----+
- "| a | | -1 |"
- "| a | 1970-01-01T00:00:00.975Z | 10 |"
- "| a | 1970-01-01T00:00:01Z | 10 |"
- "| a | 1970-01-01T00:00:01.025Z | 10 |"
- "| a | 1970-01-01T00:00:01.050Z | 10 |"
- "| a | 1970-01-01T00:00:01.075Z | 10 |"
- "| a | 1970-01-01T00:00:01.100Z | 12 |"
- "| a | 1970-01-01T00:00:01.125Z | 12 |"
- "| b | | -2 |"
- "| b | 1970-01-01T00:00:00.975Z | |"
- "| b | 1970-01-01T00:00:01Z | |"
- "| b | 1970-01-01T00:00:01.025Z | 21 |"
- "| b | 1970-01-01T00:00:01.050Z | 21 |"
- "| b | 1970-01-01T00:00:01.075Z | 21 |"
- "| b | 1970-01-01T00:00:01.100Z | 22 |"
- "| b | 1970-01-01T00:00:01.125Z | 22 |"
- +----+--------------------------+----+
"###)
});
assert_batch_count(&batches, output_batch_size);
}
}}
}
#[test]
fn test_gapfill_simple_no_lower_bound_with_nulls() {
test_helpers::maybe_start_logging();
@ -660,28 +839,6 @@ fn test_gapfill_simple_no_lower_bound_with_nulls() {
}}
}
#[test]
fn test_gapfill_strategy_not_implemented() {
test_helpers::maybe_start_logging();
let input_batch_size = 1024;
let output_batch_size = input_batch_size;
let batch = TestRecords {
group_cols: vec![vec![Some("a"), Some("a")]],
time_col: vec![Some(1_000), Some(1_100)],
agg_cols: vec![vec![Some(10), Some(11)]],
input_batch_size,
};
let mut params = get_params_ms(&batch, 25, Some(975), 1_125);
params.fill_strategy[0].1 = FillStrategy::PrevNullAsMissing;
let tc = TestCase {
test_records: batch,
output_batch_size,
params,
};
let result = tc.run();
assert_error!(result, DataFusionError::NotImplemented(ref msg) if msg == "unsupported gap fill strategy PrevNullAsMissing");
}
#[test]
fn test_gapfill_oom() {
// Show that a graceful error is produced if memory limit is exceeded