refactor: Organize window_bounds the same as selector functions (#594)

* refactor: Organize window_bounds the same as selector functions

* fix: add missing file
pull/24376/head
Andrew Lamb 2020-12-21 12:51:36 -05:00 committed by GitHub
parent bb96142564
commit 28eac06d8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 141 additions and 128 deletions

View File

@ -8,9 +8,6 @@ mod schema_pivot;
pub mod seriesset; pub mod seriesset;
pub mod stringset; pub mod stringset;
// Export function to make window bounds without exposing its implementation
pub use planning::make_window_bound_expr;
use std::sync::Arc; use std::sync::Arc;
use arrow_deps::{ use arrow_deps::{

View File

@ -4,17 +4,12 @@
use std::sync::Arc; use std::sync::Arc;
use arrow_deps::{ use arrow_deps::{
arrow::{ arrow::record_batch::RecordBatch,
array::{ArrayRef, Int64Array, Int64Builder},
datatypes::DataType,
record_batch::RecordBatch,
},
datafusion::{ datafusion::{
execution::context::{ExecutionContextState, QueryPlanner}, execution::context::{ExecutionContextState, QueryPlanner},
logical_plan::{Expr, LogicalPlan, UserDefinedLogicalNode}, logical_plan::{LogicalPlan, UserDefinedLogicalNode},
physical_plan::{ physical_plan::{
collect, collect,
functions::ScalarFunctionImplementation,
merge::MergeExec, merge::MergeExec,
planner::{DefaultPhysicalPlanner, ExtensionPlanner}, planner::{DefaultPhysicalPlanner, ExtensionPlanner},
ExecutionPlan, PhysicalPlanner, SendableRecordBatchStream, ExecutionPlan, PhysicalPlanner, SendableRecordBatchStream,
@ -24,8 +19,6 @@ use arrow_deps::{
}; };
use crate::exec::schema_pivot::{SchemaPivotExec, SchemaPivotNode}; use crate::exec::schema_pivot::{SchemaPivotExec, SchemaPivotNode};
use crate::group_by::WindowDuration;
use crate::window;
use tracing::debug; use tracing::debug;
@ -147,114 +140,3 @@ impl IOxExecutionContext {
} }
} }
} }
/// This is the implementation of the `window_bounds` user defined
/// function used in IOx to compute window boundaries when doing
/// grouping by windows.
fn window_bounds(
args: &[ArrayRef],
every: &WindowDuration,
offset: &WindowDuration,
) -> Result<ArrayRef> {
// Note: At the time of writing, DataFusion creates arrays of constants for
// constant arguments (which 4 of 5 arguments to window bounds are). We
// should eventually contribute some way back upstream to make DataFusion
// pass 4 constants rather than 4 arrays of constants.
// There are any number of ways this function could also be further
// optimized, which we leave as an exercise to our future selves
// `args` and output are dynamically-typed Arrow arrays, which means that we
// need to:
//
// 1. cast the values to the type we want
// 2. perform the window_bounds calculation for every element in the
// timestamp array
// 3. construct the resulting array
// this is guaranteed by DataFusion based on the function's signature.
assert_eq!(args.len(), 1);
let time = &args[0]
.as_any()
.downcast_ref::<Int64Array>()
.expect("cast of time failed");
// Note: the Go code uses the `Stop` field of the `GetEarliestBounds` call as
// the window boundary https://github.com/influxdata/influxdb/blob/master/storage/reads/array_cursor.gen.go#L546
// Note window doesn't use the period argument
let period = window::Duration::from_nsecs(0);
let window = window::Window::new(every.into(), period, offset.into());
// calculate the output times, one at a time, one element at a time
let mut builder = Int64Builder::new(time.len());
time.iter().try_for_each(|ts| match ts {
Some(ts) => {
let bounds = window.get_earliest_bounds(ts);
builder.append_value(bounds.stop)
}
None => builder.append_null(),
})?;
Ok(Arc::new(builder.finish()))
}
/// Create a DataFusion `Expr` that invokes `window_bounds` with the
/// appropriate every and offset arguments at runtime
pub fn make_window_bound_expr(
time_arg: Expr,
every: &WindowDuration,
offset: &WindowDuration,
) -> Expr {
// Bind a copy of the arguments in a closure
let every = every.clone();
let offset = offset.clone();
let func_ptr: ScalarFunctionImplementation =
Arc::new(move |args| window_bounds(args, &every, &offset));
let udf = create_udf(
"window_bounds",
vec![DataType::Int64], // argument types
Arc::new(DataType::Int64), // return type
func_ptr,
);
udf.call(vec![time_arg])
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_window_bounds() {
let input: ArrayRef = Arc::new(Int64Array::from(vec![
Some(100),
None,
Some(200),
Some(300),
Some(400),
]));
let every = WindowDuration::from_nanoseconds(200);
let offset = WindowDuration::from_nanoseconds(50);
let bounds_array =
window_bounds(&[input], &every, &offset).expect("window_bounds executed correctly");
let expected_array: ArrayRef = Arc::new(Int64Array::from(vec![
Some(250),
None,
Some(250),
Some(450),
Some(450),
]));
assert_eq!(
&expected_array, &bounds_array,
"Expected:\n{:?}\nActual:\n{:?}",
expected_array, bounds_array,
);
}
}

View File

@ -1,2 +1,3 @@
//! Special IOx functions used in DataFusion plans //! Special IOx functions used in DataFusion plans
pub mod selectors; pub mod selectors;
pub mod window;

131
query/src/func/window.rs Normal file
View File

@ -0,0 +1,131 @@
mod internal;
pub use internal::{Duration, Window};
use std::sync::Arc;
use arrow_deps::{
arrow::{
array::{ArrayRef, Int64Array, Int64Builder},
datatypes::DataType,
},
datafusion::{
logical_plan::Expr, physical_plan::functions::ScalarFunctionImplementation, prelude::*,
},
};
use crate::group_by::WindowDuration;
// Reuse DataFusion error and Result types for this module
pub use arrow_deps::datafusion::error::{DataFusionError as Error, Result};
/// This is the implementation of the `window_bounds` user defined
/// function used in IOx to compute window boundaries when doing
/// grouping by windows.
fn window_bounds(
args: &[ArrayRef],
every: &WindowDuration,
offset: &WindowDuration,
) -> Result<ArrayRef> {
// Note: At the time of writing, DataFusion creates arrays of constants for
// constant arguments (which 4 of 5 arguments to window bounds are). We
// should eventually contribute some way back upstream to make DataFusion
// pass 4 constants rather than 4 arrays of constants.
// There are any number of ways this function could also be further
// optimized, which we leave as an exercise to our future selves
// `args` and output are dynamically-typed Arrow arrays, which means that we
// need to:
//
// 1. cast the values to the type we want
// 2. perform the window_bounds calculation for every element in the
// timestamp array
// 3. construct the resulting array
// this is guaranteed by DataFusion based on the function's signature.
assert_eq!(args.len(), 1);
let time = &args[0]
.as_any()
.downcast_ref::<Int64Array>()
.expect("cast of time failed");
// Note: the Go code uses the `Stop` field of the `GetEarliestBounds` call as
// the window boundary https://github.com/influxdata/influxdb/blob/master/storage/reads/array_cursor.gen.go#L546
// Note window doesn't use the period argument
let period = internal::Duration::from_nsecs(0);
let window = internal::Window::new(every.into(), period, offset.into());
// calculate the output times, one at a time, one element at a time
let mut builder = Int64Builder::new(time.len());
time.iter().try_for_each(|ts| match ts {
Some(ts) => {
let bounds = window.get_earliest_bounds(ts);
builder.append_value(bounds.stop)
}
None => builder.append_null(),
})?;
Ok(Arc::new(builder.finish()))
}
/// Create a DataFusion `Expr` that invokes `window_bounds` with the
/// appropriate every and offset arguments at runtime
pub fn make_window_bound_expr(
time_arg: Expr,
every: &WindowDuration,
offset: &WindowDuration,
) -> Expr {
// Bind a copy of the arguments in a closure
let every = every.clone();
let offset = offset.clone();
let func_ptr: ScalarFunctionImplementation =
Arc::new(move |args| window_bounds(args, &every, &offset));
let udf = create_udf(
"window_bounds",
vec![DataType::Int64], // argument types
Arc::new(DataType::Int64), // return type
func_ptr,
);
udf.call(vec![time_arg])
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_window_bounds() {
let input: ArrayRef = Arc::new(Int64Array::from(vec![
Some(100),
None,
Some(200),
Some(300),
Some(400),
]));
let every = WindowDuration::from_nanoseconds(200);
let offset = WindowDuration::from_nanoseconds(50);
let bounds_array =
window_bounds(&[input], &every, &offset).expect("window_bounds executed correctly");
let expected_array: ArrayRef = Arc::new(Int64Array::from(vec![
Some(250),
None,
Some(250),
Some(450),
Some(450),
]));
assert_eq!(
&expected_array, &bounds_array,
"Expected:\n{:?}\nActual:\n{:?}",
expected_array, bounds_array,
);
}
}

View File

@ -2,10 +2,11 @@
//! and Aggregate functions in IOx, designed to be compatible with //! and Aggregate functions in IOx, designed to be compatible with
//! InfluxDB classic //! InfluxDB classic
use crate::window;
use arrow_deps::datafusion::logical_plan::Expr; use arrow_deps::datafusion::logical_plan::Expr;
use snafu::Snafu; use snafu::Snafu;
use crate::func::window;
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(display( #[snafu(display(

View File

@ -19,7 +19,6 @@ pub mod group_by;
pub mod id; pub mod id;
pub mod predicate; pub mod predicate;
pub mod util; pub mod util;
pub mod window;
use self::group_by::GroupByAndAggregate; use self::group_by::GroupByAndAggregate;
use self::predicate::{Predicate, TimestampRange}; use self::predicate::{Predicate, TimestampRange};

View File

@ -1,7 +1,9 @@
use generated_types::wal as wb; use generated_types::wal as wb;
use query::exec::make_window_bound_expr; use query::{
use query::exec::{make_schema_pivot, SeriesSetPlan}; exec::{make_schema_pivot, SeriesSetPlan},
use query::group_by::{Aggregate, WindowDuration}; func::window::make_window_bound_expr,
group_by::{Aggregate, WindowDuration},
};
use tracing::debug; use tracing::debug;
use std::{collections::BTreeSet, collections::HashMap, sync::Arc}; use std::{collections::BTreeSet, collections::HashMap, sync::Arc};