feat: add a type to characterize fill strategy (#7150)

* feat: add a type to characterize fill strategy

* chore: clippy and fix comment
pull/24376/head
Christopher M. Wolff 2023-03-07 09:11:31 -08:00 committed by GitHub
parent 8c0e23098f
commit 3f3a47eae9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 184 additions and 15 deletions

View File

@ -52,6 +52,25 @@ pub(crate) struct GapFillParams {
/// which implies that gap-filling should just start from the
/// first point in each series.
pub time_range: Range<Bound<Expr>>,
/// What to do when filling aggregate columns.
/// The first item in the tuple will be the column
/// reference for the aggregate column.
pub fill_strategy: Vec<(Expr, FillStrategy)>,
}
/// Describes how to fill gaps in an aggregate column.
#[derive(Clone, Debug, PartialEq)]
pub enum FillStrategy {
/// Fill with null values.
/// This is the InfluxQL behavior for `FILL(NULL)` or `FILL(NONE)`.
Null,
/// Fill with the most recent value in the input column.
#[allow(dead_code)]
Prev,
/// Fill with the most recent non-null value in the input column.
/// This is the InfluxQL behavior for `FILL(PREVIOUS)`.
#[allow(dead_code)]
PrevNullAsMissing,
}
impl GapFillParams {
@ -74,7 +93,7 @@ impl GapFillParams {
}
#[allow(clippy::wrong_self_convention)] // follows convention of UserDefinedLogicalNode
fn from_template(&self, exprs: &[Expr]) -> Self {
fn from_template(&self, exprs: &[Expr], aggr_expr: &[Expr]) -> Self {
assert!(
exprs.len() >= 3,
"should be a at least stride, source and origin in params"
@ -89,11 +108,24 @@ impl GapFillParams {
})
})
.unwrap();
let fill_strategy = aggr_expr
.iter()
.cloned()
.zip(
self.fill_strategy
.iter()
.map(|(_expr, fill_strategy)| fill_strategy)
.cloned(),
)
.collect();
Self {
stride,
time_column,
origin,
time_range,
fill_strategy,
}
}
}
@ -161,7 +193,7 @@ impl UserDefinedLogicalNode for GapFill {
let mut group_expr: Vec<_> = exprs.to_vec();
let mut aggr_expr = group_expr.split_off(self.group_expr.len());
let param_expr = aggr_expr.split_off(self.aggr_expr.len());
let params = self.params.from_template(&param_expr);
let params = self.params.from_template(&param_expr, &aggr_expr);
let gapfill = Self::try_new(Arc::new(inputs[0].clone()), group_expr, aggr_expr, params)
.expect("should not fail");
Arc::new(gapfill)
@ -228,11 +260,24 @@ pub(crate) fn plan_gap_fill(
execution_props,
)?;
let fill_strategy = gap_fill
.params
.fill_strategy
.iter()
.map(|(e, fs)| {
Ok((
create_physical_expr(e, input_dfschema, input_schema, execution_props)?,
fs.clone(),
))
})
.collect::<Result<Vec<(Arc<dyn PhysicalExpr>, FillStrategy)>>>()?;
let params = GapFillExecParams {
stride,
time_column,
origin,
time_range,
fill_strategy,
};
GapFillExec::try_new(
Arc::clone(&physical_inputs[0]),
@ -269,6 +314,7 @@ fn bound_extract<T>(b: &Bound<T>) -> Option<&T> {
Bound::Unbounded => None,
}
}
/// A physical node for the gap-fill operation.
pub struct GapFillExec {
input: Arc<dyn ExecutionPlan>,
@ -296,6 +342,9 @@ struct GapFillExecParams {
/// The time range of source input to DATE_BIN_GAPFILL.
/// Inferred from predicates in the overall query.
time_range: Range<Bound<Arc<dyn PhysicalExpr>>>,
/// What to do when filling aggregate columns.
/// The 0th element in each tuple is the aggregate column.
fill_strategy: Vec<(Arc<dyn PhysicalExpr>, FillStrategy)>,
}
impl GapFillExec {
@ -482,7 +531,9 @@ mod test {
error::Result,
execution::runtime_env::{RuntimeConfig, RuntimeEnv},
logical_expr::{logical_plan, Extension},
physical_plan::{collect, expressions::lit as phys_lit, memory::MemoryExec},
physical_plan::{
collect, expressions::col as phys_col, expressions::lit as phys_lit, memory::MemoryExec,
},
prelude::{col, lit, lit_timestamp_nano, SessionConfig, SessionContext},
scalar::ScalarValue,
};
@ -507,6 +558,10 @@ mod test {
logical_plan::table_scan(Some("temps"), &schema, None)?.build()
}
fn fill_strategy_null(cols: Vec<Expr>) -> Vec<(Expr, FillStrategy)> {
cols.into_iter().map(|e| (e, FillStrategy::Null)).collect()
}
#[test]
fn test_try_new_errs() {
let scan = table_scan().unwrap();
@ -522,6 +577,7 @@ mod test {
start: Bound::Included(lit_timestamp_nano(1000)),
end: Bound::Unbounded,
},
fill_strategy: fill_strategy_null(vec![col("temp")]),
},
);
@ -569,6 +625,7 @@ mod test {
time_column: col("time"),
origin: lit_timestamp_nano(0),
time_range,
fill_strategy: fill_strategy_null(vec![col("temp")]),
},
)
.unwrap();
@ -594,6 +651,7 @@ mod test {
start: Bound::Included(lit_timestamp_nano(1000)),
end: Bound::Excluded(lit_timestamp_nano(2000)),
},
fill_strategy: fill_strategy_null(vec![col("temp")]),
},
)?;
let plan = LogicalPlan::Extension(Extension {
@ -1234,6 +1292,29 @@ mod test {
}
}
#[tokio::test]
async fn test_gapfill_strategy_not_implemented() -> Result<()> {
test_helpers::maybe_start_logging();
let input_batch_size = 1024;
let output_batch_size = input_batch_size;
let batch = TestRecords {
group_cols: vec![vec![Some("a"), Some("a")]],
time_col: vec![Some(1_000), Some(1_100)],
agg_cols: vec![vec![Some(10), Some(11)]],
input_batch_size,
};
let mut params = get_params_ms(&batch, 25, Some(975), 1_125);
params.fill_strategy[0].1 = FillStrategy::PrevNullAsMissing;
let tc = TestCase {
test_records: batch,
output_batch_size,
params,
};
let result = tc.run().await;
assert_error!(result, DataFusionError::NotImplemented(ref msg) if msg == "unsupported gap fill strategy PrevNullAsMissing");
Ok(())
}
#[tokio::test]
async fn test_gapfill_oom() {
// Show that a graceful error is produced if memory limit is exceeded
@ -1437,6 +1518,18 @@ mod test {
}
}
fn phys_fill_strategy_null(
records: &TestRecords,
) -> Result<Vec<(Arc<dyn PhysicalExpr>, FillStrategy)>> {
let start = records.group_cols.len() + 1; // 1 is for time col
let end = start + records.agg_cols.len();
let mut v = Vec::with_capacity(records.agg_cols.len());
for f in records.schema().fields()[start..end].iter() {
v.push((phys_col(f.name(), &records.schema())?, FillStrategy::Null));
}
Ok(v)
}
fn get_params_ms(
batch: &TestRecords,
stride: i64,
@ -1461,6 +1554,7 @@ mod test {
None,
))),
},
fill_strategy: phys_fill_strategy_null(batch).unwrap(),
}
}
}

View File

@ -11,7 +11,7 @@ use arrow::{
};
use datafusion::error::{DataFusionError, Result};
use super::params::GapFillParams;
use super::{params::GapFillParams, FillStrategy};
/// Provides methods to the [`GapFillStream`](super::stream::GapFillStream)
/// module that fill gaps in buffered input.
@ -270,8 +270,19 @@ impl GapFiller {
// Build the aggregate columns
for (idx, aa) in aggr_arr.iter() {
let mut cursor = self.cursor.clone();
let take_vec =
cursor.build_aggr_take_vec(&self.params, series_ends, input_time_array)?;
let take_vec = match self.params.fill_strategy.get(idx) {
Some(FillStrategy::Null) => cursor.build_aggr_take_vec_fill_null(
&self.params,
series_ends,
input_time_array,
),
Some(fs) => Err(DataFusionError::NotImplemented(format!(
"unsupported gap fill strategy {fs:?}"
))),
None => Err(DataFusionError::Internal(format!(
"could not find fill strategy for aggregate column with index {idx}"
))),
}?;
if take_vec.len() != output_time_len {
return Err(DataFusionError::Internal(format!(
"gapfill aggr column has {} rows, expected {}",
@ -421,7 +432,7 @@ impl Cursor {
/// Builds a vector that can use the [`take`](take::take) kernel
/// to produce an aggregate output column.
fn build_aggr_take_vec(
fn build_aggr_take_vec_fill_null(
&mut self,
params: &GapFillParams,
series_ends: &[usize],
@ -584,8 +595,9 @@ enum RowStatus {
mod tests {
use arrow::array::TimestampNanosecondArray;
use datafusion::error::Result;
use hashbrown::HashMap;
use crate::exec::gapfill::{algo::Cursor, params::GapFillParams};
use crate::exec::gapfill::{algo::Cursor, params::GapFillParams, FillStrategy};
#[test]
fn test_cursor_append_time_values() -> Result<()> {
@ -597,6 +609,7 @@ mod tests {
stride: 50,
first_ts: Some(950),
last_ts: 1250,
fill_strategy: simple_fill_strategy(),
};
let output_batch_size = 10000;
@ -642,6 +655,7 @@ mod tests {
stride: 50,
first_ts: None,
last_ts: 1250,
fill_strategy: simple_fill_strategy(),
};
let output_batch_size = 10000;
@ -680,6 +694,7 @@ mod tests {
stride: 50,
first_ts: Some(950),
last_ts: 1250,
fill_strategy: simple_fill_strategy(),
};
let output_batch_size = 10000;
@ -725,6 +740,7 @@ mod tests {
stride: 50,
first_ts: Some(950),
last_ts: 1250,
fill_strategy: simple_fill_strategy(),
};
let output_batch_size = 10000;
@ -757,6 +773,7 @@ mod tests {
stride: 50,
first_ts: Some(950),
last_ts: 1250,
fill_strategy: simple_fill_strategy(),
};
let output_batch_size = 10000;
@ -766,7 +783,7 @@ mod tests {
remaining_output_batch_size: output_batch_size,
};
let take_idxs = cursor.build_aggr_take_vec(&params, &[series], &input_times)?;
let take_idxs = cursor.build_aggr_take_vec_fill_null(&params, &[series], &input_times)?;
assert_eq!(
vec![None, Some(0), None, Some(1), None, Some(2), None],
take_idxs
@ -795,6 +812,7 @@ mod tests {
stride: 50,
first_ts: Some(950),
last_ts: 1250,
fill_strategy: simple_fill_strategy(),
};
let output_batch_size = 10000;
@ -804,7 +822,7 @@ mod tests {
remaining_output_batch_size: output_batch_size,
};
let take_idxs = cursor.build_aggr_take_vec(&params, &[series], &input_times)?;
let take_idxs = cursor.build_aggr_take_vec_fill_null(&params, &[series], &input_times)?;
assert_eq!(
vec![
Some(0), // corresopnds to null ts
@ -839,6 +857,7 @@ mod tests {
stride: 50,
first_ts: Some(950),
last_ts: 1350,
fill_strategy: simple_fill_strategy(),
};
let input_times = TimestampNanosecondArray::from(vec![
// 950
@ -916,6 +935,7 @@ mod tests {
stride: 50,
first_ts: Some(950),
last_ts: 1350,
fill_strategy: simple_fill_strategy(),
};
let input_times = TimestampNanosecondArray::from(vec![
None,
@ -999,6 +1019,7 @@ mod tests {
stride: 50,
first_ts: Some(1000),
last_ts: 1100,
fill_strategy: simple_fill_strategy(),
};
let input_times = TimestampNanosecondArray::from(vec![
None,
@ -1081,6 +1102,7 @@ mod tests {
stride: 50,
first_ts: Some(1000),
last_ts: 1100,
fill_strategy: simple_fill_strategy(),
};
let input_times = TimestampNanosecondArray::from(vec![
None,
@ -1162,6 +1184,7 @@ mod tests {
stride: 100,
first_ts: Some(200),
last_ts: 1000,
fill_strategy: simple_fill_strategy(),
};
let input_times = TimestampNanosecondArray::from(vec![300, 500, 700, 800]);
let series = input_times.len();
@ -1255,6 +1278,7 @@ mod tests {
stride: 50,
first_ts: Some(1000),
last_ts: 1200,
fill_strategy: simple_fill_strategy(),
};
let input_times = TimestampNanosecondArray::from(vec![
1000, // 1050
@ -1382,6 +1406,10 @@ mod tests {
cursor.count_series_rows(params, input_times, series_end)
}
fn simple_fill_strategy() -> HashMap<usize, FillStrategy> {
std::iter::once((1, FillStrategy::Null)).collect()
}
struct Expected {
times: Vec<Option<i64>>,
group_take: Vec<u64>,
@ -1407,7 +1435,8 @@ mod tests {
.build_group_take_vec(params, &[series_end], input_times)?;
assert_eq!(expected.group_take, actual_group_take, "{desc} group take");
let actual_aggr_take = cursor.build_aggr_take_vec(params, &[series_end], input_times)?;
let actual_aggr_take =
cursor.build_aggr_take_vec_fill_null(params, &[series_end], input_times)?;
assert_eq!(expected.aggr_take, actual_aggr_take, "{desc} aggr take");
Ok(())

View File

@ -9,11 +9,12 @@ use chrono::Duration;
use datafusion::{
error::{DataFusionError, Result},
physical_expr::datetime_expressions::date_bin,
physical_plan::ColumnarValue,
physical_plan::{expressions::Column, ColumnarValue},
scalar::ScalarValue,
};
use hashbrown::HashMap;
use super::{try_map_bound, try_map_range, GapFillExecParams};
use super::{try_map_bound, try_map_range, FillStrategy, GapFillExecParams};
/// The parameters to gap filling. Included here are the parameters
/// that remain constant during gap filling, i.e., not the streaming table
@ -30,6 +31,9 @@ pub(super) struct GapFillParams {
/// The last timestamp (inclusive!) to be output for each series,
/// in nanoseconds since the epoch.
pub last_ts: i64,
/// What to do when filling gaps in aggregate columns.
/// The map is keyed on the columns offset in the schema.
pub fill_strategy: HashMap<usize, FillStrategy>,
}
impl GapFillParams {
@ -76,10 +80,26 @@ impl GapFillParams {
args[1] = i64_to_columnar_ts(Some(last_ts));
let last_ts = extract_timestamp_nanos(&date_bin(&args)?)?;
let fill_strategy = params
.fill_strategy
.iter()
.map(|(e, fs)| {
let idx = e
.as_any()
.downcast_ref::<Column>()
.ok_or(DataFusionError::Internal(format!(
"fill strategy aggr expr was not a column: {e:?}",
)))?
.index();
Ok((idx, fs.clone()))
})
.collect::<Result<HashMap<usize, FillStrategy>>>()?;
Ok(Self {
stride: extract_interval_nanos(&args[0])?,
first_ts,
last_ts,
fill_strategy,
})
}
@ -144,9 +164,10 @@ mod tests {
},
scalar::ScalarValue,
};
use hashbrown::HashMap;
use crate::exec::{
gapfill::{GapFillExec, GapFillExecParams},
gapfill::{FillStrategy, GapFillExec, GapFillExecParams},
Executor, ExecutorType,
};
@ -167,6 +188,7 @@ mod tests {
stride: 60_000_000_000, // 1 minute
first_ts: Some(441_820_500_000_000_000), // Sunday, January 1, 1984 3:55:00 PM
last_ts: 441_820_800_000_000_000, // Sunday, January 1, 1984 3:59:00 PM
fill_strategy: HashMap::new(),
};
assert_eq!(expected, actual);
Ok(())
@ -188,6 +210,7 @@ mod tests {
first_ts: Some(441_820_500_000_000_000), // Sunday, January 1, 1984 3:55:00 PM
// Last bin at 16:00 is excluded
last_ts: 441_820_740_000_000_000, // Sunday, January 1, 1984 3:59:00 PM
fill_strategy: HashMap::new(),
};
assert_eq!(expected, actual);
Ok(())
@ -209,6 +232,7 @@ mod tests {
// First bin not exluded since it truncates to 15:55:00
first_ts: Some(441_820_500_000_000_000), // Sunday, January 1, 1984 3:55:00 PM
last_ts: 441_820_800_000_000_000, // Sunday, January 1, 1984 3:59:00 PM
fill_strategy: HashMap::new(),
};
assert_eq!(expected, actual);
Ok(())
@ -230,6 +254,7 @@ mod tests {
stride: 60_000_000_000, // 1 minute
first_ts: Some(441_820_449_000_000_000), // Sunday, January 1, 1984 3:54:09 PM
last_ts: 441_820_749_000_000_000, // Sunday, January 1, 1984 3:59:09 PM
fill_strategy: HashMap::new(),
};
assert_eq!(expected, actual);
Ok(())
@ -258,6 +283,11 @@ mod tests {
start: Bound::Unbounded,
end: Bound::Excluded(timestamp(20_000_000_000)),
},
fill_strategy: std::iter::once((
Arc::new(Column::new("a0", 1)) as Arc<dyn PhysicalExpr>,
FillStrategy::Null,
))
.collect(),
};
let actual = GapFillParams::try_new(schema().into(), &exec_params).unwrap();
@ -266,6 +296,7 @@ mod tests {
stride: 1_000_000_000,
first_ts: None,
last_ts: 19_000_000_000,
fill_strategy: simple_fill_strategy(),
},
actual
);
@ -279,6 +310,7 @@ mod tests {
stride: 10,
first_ts: Some(1000),
last_ts: 1050,
fill_strategy: simple_fill_strategy(),
};
assert_eq!(6, params.valid_row_count(1000));
@ -316,4 +348,8 @@ mod tests {
let schema = schema();
GapFillParams::try_new(schema.into(), exec_params)
}
fn simple_fill_strategy() -> HashMap<usize, FillStrategy> {
std::iter::once((1, FillStrategy::Null)).collect()
}
}

View File

@ -3,7 +3,7 @@
mod range_predicate;
use crate::exec::gapfill::{GapFill, GapFillParams};
use crate::exec::gapfill::{FillStrategy, GapFill, GapFillParams};
use datafusion::{
error::{DataFusionError, Result},
logical_expr::{
@ -182,6 +182,15 @@ fn build_gapfill_node(
.collect();
let aggr_expr = new_group_expr.split_off(aggr.group_expr.len());
// For now, we can only fill with null values.
// In the future, this rule will allow a projection to be pushed into the
// GapFill node, e.g., if it contains an item like `LOCF(<col>)`.
let fill_behavior = aggr_expr
.iter()
.cloned()
.map(|e| (e, FillStrategy::Null))
.collect();
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(GapFill::try_new(
Arc::new(new_aggr_plan),
@ -192,6 +201,7 @@ fn build_gapfill_node(
time_column,
origin,
time_range,
fill_strategy: fill_behavior,
},
)?),
}))