feat: Hook up read_window_aggregate into the write_buffer, end-to-end tests (#483)
* feat: read_window_aggregate_plans * fix: clippy sacrifice * fix: clippy * fix: clippypull/24376/head
parent
c08744603b
commit
0eaa90e89d
|
@ -8,6 +8,9 @@ mod schema_pivot;
|
|||
pub mod seriesset;
|
||||
pub mod stringset;
|
||||
|
||||
// Export function to make window bounds without exposing its implementation
|
||||
pub use planning::make_window_bound_expr;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_deps::{
|
||||
|
@ -151,9 +154,9 @@ pub struct SeriesSetPlan {
|
|||
/// Datafusion plan to execute. The plan must produce
|
||||
/// RecordBatches that have:
|
||||
///
|
||||
/// * fields with matching names for each value of `tag_columns` and `field_columns`
|
||||
/// * include the timestamp column
|
||||
/// * each column named in tag_columns must be a String (Utf8)
|
||||
/// * fields for each name in `tag_columns` and `field_columns`
|
||||
/// * a timestamp column called 'time'
|
||||
/// * each column in tag_columns must be a String (Utf8)
|
||||
pub plan: LogicalPlan,
|
||||
|
||||
/// The names of the columns that define tags.
|
||||
|
|
|
@ -2,6 +2,13 @@
|
|||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_deps::arrow::array::ArrayRef;
|
||||
use arrow_deps::arrow::array::Int64Array;
|
||||
use arrow_deps::arrow::array::Int64Builder;
|
||||
use arrow_deps::arrow::datatypes::DataType;
|
||||
use arrow_deps::datafusion::logical_plan::Expr;
|
||||
use arrow_deps::datafusion::physical_plan::functions::ScalarFunctionImplementation;
|
||||
use arrow_deps::datafusion::prelude::create_udf;
|
||||
use arrow_deps::{
|
||||
arrow::record_batch::RecordBatch,
|
||||
datafusion::physical_plan::merge::MergeExec,
|
||||
|
@ -20,6 +27,8 @@ use arrow_deps::{
|
|||
};
|
||||
|
||||
use crate::exec::schema_pivot::{SchemaPivotExec, SchemaPivotNode};
|
||||
use crate::group_by::WindowDuration;
|
||||
use crate::window;
|
||||
|
||||
use tracing::debug;
|
||||
|
||||
|
@ -141,3 +150,113 @@ impl IOxExecutionContext {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This is the implementation of the `window_bounds` user defined
|
||||
/// function used in IOx to compute window boundaries when doing
|
||||
/// grouping by windows.
|
||||
fn window_bounds(
|
||||
args: &[ArrayRef],
|
||||
every: &WindowDuration,
|
||||
offset: &WindowDuration,
|
||||
) -> Result<ArrayRef> {
|
||||
// Note: At the time of writing, DataFusion creates arrays of
|
||||
// constants for constant arguments (which 4 of 5 arguments to
|
||||
// window bounds are). We should eventually contribute someway
|
||||
// back upstream to make DataFusion pass 4 constants rather than 4
|
||||
// arrays of constants.
|
||||
|
||||
// There are any number of ways this function could also be
|
||||
// further optimized, which we leave as an exercise to our future
|
||||
// selves
|
||||
|
||||
// `args` and output are dynamically-typed Arrow arrays, which means that we need to:
|
||||
// 1. cast the values to the type we want
|
||||
// 2. perform the window_bounds calculation for every element in the timestamp array
|
||||
// 3. construct the resulting array
|
||||
|
||||
// this is guaranteed by DataFusion based on the function's signature.
|
||||
assert_eq!(args.len(), 1);
|
||||
|
||||
let time = &args[0]
|
||||
.as_any()
|
||||
.downcast_ref::<Int64Array>()
|
||||
.expect("cast of time failed");
|
||||
|
||||
// Note: the Go code uses the `Stop` field of the `GetEarliestBounds` call as the window boundary
|
||||
// https://github.com/influxdata/influxdb/blob/master/storage/reads/array_cursor.gen.go#L546
|
||||
|
||||
// Note window doesn't use the period argument
|
||||
let period = window::Duration::from_nsecs(0);
|
||||
let window = window::Window::new(every.into(), period, offset.into());
|
||||
|
||||
// calculate the output times, one at a time, one element at a time
|
||||
let mut builder = Int64Builder::new(time.len());
|
||||
time.iter().try_for_each(|ts| match ts {
|
||||
Some(ts) => {
|
||||
let bounds = window.get_earliest_bounds(ts);
|
||||
builder.append_value(bounds.stop)
|
||||
}
|
||||
None => builder.append_null(),
|
||||
})?;
|
||||
|
||||
Ok(Arc::new(builder.finish()))
|
||||
}
|
||||
|
||||
/// Create a DataFusion `Expr` that invokes `window_bounds` with the
|
||||
/// appropriate every and offset arguments at runtime
|
||||
pub fn make_window_bound_expr(
|
||||
time_arg: Expr,
|
||||
every: &WindowDuration,
|
||||
offset: &WindowDuration,
|
||||
) -> Expr {
|
||||
// Bind a copy of the arguments in a closure
|
||||
let every = every.clone();
|
||||
let offset = offset.clone();
|
||||
let func_ptr: ScalarFunctionImplementation =
|
||||
Arc::new(move |args| window_bounds(args, &every, &offset));
|
||||
|
||||
let udf = create_udf(
|
||||
"window_bounds",
|
||||
vec![DataType::Int64], // argument types
|
||||
Arc::new(DataType::Int64), // return type
|
||||
func_ptr,
|
||||
);
|
||||
|
||||
udf.call(vec![time_arg])
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_window_bounds() {
|
||||
let input: ArrayRef = Arc::new(Int64Array::from(vec![
|
||||
Some(100),
|
||||
None,
|
||||
Some(200),
|
||||
Some(300),
|
||||
Some(400),
|
||||
]));
|
||||
|
||||
let every = WindowDuration::from_nanoseconds(200);
|
||||
let offset = WindowDuration::from_nanoseconds(50);
|
||||
|
||||
let bounds_array =
|
||||
window_bounds(&[input], &every, &offset).expect("window_bounds executed correctly");
|
||||
|
||||
let expected_array: ArrayRef = Arc::new(Int64Array::from(vec![
|
||||
Some(250),
|
||||
None,
|
||||
Some(250),
|
||||
Some(450),
|
||||
Some(450),
|
||||
]));
|
||||
|
||||
assert_eq!(
|
||||
&expected_array, &bounds_array,
|
||||
"Expected:\n{:?}\nActual:\n{:?}",
|
||||
expected_array, bounds_array,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,22 @@
|
|||
//! and Aggregate functions in IOx, designed to be compatible with
|
||||
//! InfluxDB classic
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
use crate::window;
|
||||
use arrow_deps::datafusion::logical_plan::Expr;
|
||||
use snafu::Snafu;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display(
|
||||
"Aggregate not yet supported {}. See https://github.com/influxdata/influxdb_iox/issues/480",
|
||||
agg
|
||||
))]
|
||||
AggregateNotSupported { agg: String },
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Copy)]
|
||||
pub enum Aggregate {
|
||||
Sum,
|
||||
Count,
|
||||
|
@ -59,6 +74,22 @@ pub enum WindowDuration {
|
|||
Fixed { nanoseconds: i64 },
|
||||
}
|
||||
|
||||
impl Aggregate {
|
||||
/// Create the appropriate DataFusion expression for this aggregate
|
||||
pub fn to_datafusion_expr(&self, input: Expr) -> Result<Expr> {
|
||||
use arrow_deps::datafusion::logical_plan::{avg, count, max, min, sum};
|
||||
match self {
|
||||
Self::Sum => Ok(sum(input)),
|
||||
Self::Count => Ok(count(input)),
|
||||
Self::Min => Ok(min(input)),
|
||||
Self::Max => Ok(max(input)),
|
||||
Self::First => AggregateNotSupported { agg: "First" }.fail(),
|
||||
Self::Last => AggregateNotSupported { agg: "Last" }.fail(),
|
||||
Self::Mean => Ok(avg(input)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl WindowDuration {
|
||||
pub fn from_nanoseconds(nanoseconds: i64) -> Self {
|
||||
Self::Fixed { nanoseconds }
|
||||
|
@ -68,3 +99,16 @@ impl WindowDuration {
|
|||
Self::Variable { months, negative }
|
||||
}
|
||||
}
|
||||
|
||||
// Translation to the structures for the underlying window
|
||||
// implementation
|
||||
impl Into<window::Duration> for &WindowDuration {
|
||||
fn into(self) -> window::Duration {
|
||||
match *self {
|
||||
WindowDuration::Variable { months, negative } => {
|
||||
window::Duration::from_months_with_negative(months, negative)
|
||||
}
|
||||
WindowDuration::Fixed { nanoseconds } => window::Duration::from_nsecs(nanoseconds),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,6 +55,16 @@ impl Duration {
|
|||
}
|
||||
}
|
||||
|
||||
/// create a duration from a non negative value of months and a negative flag
|
||||
pub fn from_months_with_negative(months: i64, negative: bool) -> Self {
|
||||
assert_eq!(months < 0, negative);
|
||||
Self {
|
||||
months,
|
||||
negative,
|
||||
nsecs: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// IsZero returns true if this is a zero duration.
|
||||
///
|
||||
/// Original: https://github.com/influxdata/flux/blob/1e9bfd49f21c0e679b42acf6fc515ce05c6dec2b/values/time.go#L204
|
||||
|
@ -256,8 +266,8 @@ impl Add<Duration> for i64 {
|
|||
/// Original: https://github.com/influxdata/flux/blob/1e9bfd49f21c0e679b42acf6fc515ce05c6dec2b/execute/bounds.go#L19
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub struct Bounds {
|
||||
start: i64,
|
||||
stop: i64,
|
||||
pub start: i64,
|
||||
pub stop: i64,
|
||||
}
|
||||
|
||||
/// Represents a window in time
|
||||
|
@ -311,6 +321,7 @@ impl Window {
|
|||
self.every.truncate(t)
|
||||
}
|
||||
}
|
||||
|
||||
/// truncateByNsecs will truncate the time to the given number
|
||||
/// of nanoseconds.
|
||||
///
|
||||
|
|
|
@ -25,7 +25,7 @@ use generated_types::{
|
|||
storage_client::StorageClient,
|
||||
Aggregate, MeasurementFieldsRequest, MeasurementNamesRequest, MeasurementTagKeysRequest,
|
||||
MeasurementTagValuesRequest, Node, Predicate, ReadFilterRequest, ReadGroupRequest, ReadSource,
|
||||
Tag, TagKeysRequest, TagValuesRequest, TimestampRange,
|
||||
ReadWindowAggregateRequest, Tag, TagKeysRequest, TagValuesRequest, TimestampRange,
|
||||
};
|
||||
use prost::Message;
|
||||
use std::convert::TryInto;
|
||||
|
@ -241,24 +241,7 @@ async fn read_and_write_data() -> Result<()> {
|
|||
};
|
||||
let range = Some(range);
|
||||
|
||||
let predicate = Predicate {
|
||||
root: Some(Node {
|
||||
node_type: NodeType::ComparisonExpression as i32,
|
||||
children: vec![
|
||||
Node {
|
||||
node_type: NodeType::TagRef as i32,
|
||||
children: vec![],
|
||||
value: Some(Value::TagRefValue("host".into())),
|
||||
},
|
||||
Node {
|
||||
node_type: NodeType::Literal as i32,
|
||||
children: vec![],
|
||||
value: Some(Value::StringValue("server01".into())),
|
||||
},
|
||||
],
|
||||
value: Some(Value::Comparison(Comparison::Equal as _)),
|
||||
}),
|
||||
};
|
||||
let predicate = make_tag_predicate("host", "server01");
|
||||
let predicate = Some(predicate);
|
||||
|
||||
let read_filter_request = tonic::Request::new(ReadFilterRequest {
|
||||
|
@ -464,6 +447,15 @@ async fn read_and_write_data() -> Result<()> {
|
|||
|
||||
test_http_error_messages(&client2).await?;
|
||||
|
||||
test_read_window_aggregate(
|
||||
&mut storage_client,
|
||||
&client2,
|
||||
&read_source,
|
||||
org_id_str,
|
||||
bucket_id_str,
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -482,6 +474,107 @@ async fn test_http_error_messages(client: &influxdb2_client::Client) -> Result<(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
// Standalone test that all the pipes are hooked up for read window aggregate
|
||||
async fn test_read_window_aggregate(
|
||||
storage_client: &mut StorageClient<tonic::transport::Channel>,
|
||||
client: &influxdb2_client::Client,
|
||||
read_source: &std::option::Option<prost_types::Any>,
|
||||
org_id: &str,
|
||||
bucket_id: &str,
|
||||
) {
|
||||
let line_protocol = vec![
|
||||
"h2o,state=MA,city=Boston temp=70.0 100",
|
||||
"h2o,state=MA,city=Boston temp=71.0 200",
|
||||
"h2o,state=MA,city=Boston temp=72.0 300",
|
||||
"h2o,state=MA,city=Boston temp=73.0 400",
|
||||
"h2o,state=MA,city=Boston temp=74.0 500",
|
||||
"h2o,state=MA,city=Cambridge temp=80.0 100",
|
||||
"h2o,state=MA,city=Cambridge temp=81.0 200",
|
||||
"h2o,state=MA,city=Cambridge temp=82.0 300",
|
||||
"h2o,state=MA,city=Cambridge temp=83.0 400",
|
||||
"h2o,state=MA,city=Cambridge temp=84.0 500",
|
||||
"h2o,state=CA,city=LA temp=90.0 100",
|
||||
"h2o,state=CA,city=LA temp=91.0 200",
|
||||
"h2o,state=CA,city=LA temp=92.0 300",
|
||||
"h2o,state=CA,city=LA temp=93.0 400",
|
||||
"h2o,state=CA,city=LA temp=94.0 500",
|
||||
]
|
||||
.join("\n");
|
||||
|
||||
client
|
||||
.write_line_protocol(org_id, bucket_id, line_protocol)
|
||||
.await
|
||||
.expect("Wrote h20 line protocol");
|
||||
|
||||
// now, query using read window aggregate
|
||||
|
||||
let request = ReadWindowAggregateRequest {
|
||||
read_source: read_source.clone(),
|
||||
range: Some(TimestampRange {
|
||||
start: 200,
|
||||
end: 1000,
|
||||
}),
|
||||
predicate: Some(make_tag_predicate("state", "MA")),
|
||||
window_every: 200,
|
||||
offset: 0,
|
||||
aggregate: vec![Aggregate {
|
||||
r#type: AggregateType::Sum as i32,
|
||||
}],
|
||||
window: None,
|
||||
};
|
||||
|
||||
let response = storage_client.read_window_aggregate(request).await.unwrap();
|
||||
|
||||
let responses: Vec<_> = response.into_inner().try_collect().await.unwrap();
|
||||
|
||||
let frames: Vec<_> = responses
|
||||
.into_iter()
|
||||
.flat_map(|r| r.frames)
|
||||
.flat_map(|f| f.data)
|
||||
.collect();
|
||||
|
||||
let expected_frames = vec![
|
||||
"GroupFrame, tag_keys: city,state, partition_key_vals: Boston,MA",
|
||||
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Boston,state=MA, type: 0",
|
||||
"FloatPointsFrame, timestamps: [400, 600], values: \"143,147\"",
|
||||
"GroupFrame, tag_keys: city,state, partition_key_vals: Cambridge,MA",
|
||||
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Cambridge,state=MA, type: 0",
|
||||
"FloatPointsFrame, timestamps: [400, 600], values: \"163,167\"",
|
||||
];
|
||||
|
||||
let actual_frames = dump_data_frames(&frames);
|
||||
|
||||
assert_eq!(
|
||||
expected_frames,
|
||||
actual_frames,
|
||||
"Expected:\n{}\nActual:\n{}",
|
||||
expected_frames.join("\n"),
|
||||
actual_frames.join("\n")
|
||||
);
|
||||
}
|
||||
|
||||
/// Create a predicate representing tag_name=tag_value in the horrible gRPC structs
|
||||
fn make_tag_predicate(tag_name: impl Into<String>, tag_value: impl Into<String>) -> Predicate {
|
||||
Predicate {
|
||||
root: Some(Node {
|
||||
node_type: NodeType::ComparisonExpression as i32,
|
||||
children: vec![
|
||||
Node {
|
||||
node_type: NodeType::TagRef as i32,
|
||||
children: vec![],
|
||||
value: Some(Value::TagRefValue(tag_name.into().into())),
|
||||
},
|
||||
Node {
|
||||
node_type: NodeType::Literal as i32,
|
||||
children: vec![],
|
||||
value: Some(Value::StringValue(tag_value.into())),
|
||||
},
|
||||
],
|
||||
value: Some(Value::Comparison(Comparison::Equal as _)),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// substitutes "ns" --> ns_since_epoch, ns1-->ns_since_epoch+1, etc
|
||||
fn substitute_nanos(ns_since_epoch: i64, lines: &[&str]) -> Vec<String> {
|
||||
let substitutions = vec![
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
use generated_types::wal as wb;
|
||||
use influxdb_line_protocol::ParsedLine;
|
||||
use query::group_by::Aggregate;
|
||||
use query::group_by::GroupByAndAggregate;
|
||||
use query::group_by::WindowDuration;
|
||||
use query::{
|
||||
exec::{
|
||||
stringset::StringSet, FieldListPlan, GroupedSeriesSetPlan, GroupedSeriesSetPlans,
|
||||
|
@ -346,7 +348,7 @@ impl Db {
|
|||
impl Database for Db {
|
||||
type Error = Error;
|
||||
|
||||
// TODO: writes lines creates a column named "time" for the timestmap data. If
|
||||
// TODO: writes lines creates a column named "time" for the timestamp data. If
|
||||
// we keep this we need to validate that no tag or field has the same name.
|
||||
async fn write_lines(&self, lines: &[ParsedLine<'_>]) -> Result<(), Self::Error> {
|
||||
let data = split_lines_into_write_entry_partitions(partition_key, lines);
|
||||
|
@ -477,7 +479,7 @@ impl Database for Db {
|
|||
predicate: Predicate,
|
||||
gby_agg: GroupByAndAggregate,
|
||||
) -> Result<GroupedSeriesSetPlans, Self::Error> {
|
||||
let filter = PartitionTableFilter::new(predicate);
|
||||
let mut filter = PartitionTableFilter::new(predicate);
|
||||
|
||||
match gby_agg {
|
||||
GroupByAndAggregate::Columns {
|
||||
|
@ -492,12 +494,10 @@ impl Database for Db {
|
|||
self.visit_tables(&mut filter, &mut visitor).await?;
|
||||
Ok(visitor.plans.into())
|
||||
}
|
||||
GroupByAndAggregate::Window {
|
||||
agg: _agg,
|
||||
every: _every,
|
||||
offset: _offset,
|
||||
} => {
|
||||
unimplemented!("query groups with window aggregates are not yet implemented");
|
||||
GroupByAndAggregate::Window { agg, every, offset } => {
|
||||
let mut visitor = WindowGroupsVisitor::new(agg, every, offset);
|
||||
self.visit_tables(&mut filter, &mut visitor).await?;
|
||||
Ok(visitor.plans.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1087,6 +1087,46 @@ impl Visitor for GroupsVisitor {
|
|||
}
|
||||
}
|
||||
|
||||
/// Return DataFusion plans to calculate series that pass the
|
||||
/// specified predicate, grouped using the window definition
|
||||
struct WindowGroupsVisitor {
|
||||
agg: Aggregate,
|
||||
every: WindowDuration,
|
||||
offset: WindowDuration,
|
||||
|
||||
plans: Vec<GroupedSeriesSetPlan>,
|
||||
}
|
||||
|
||||
impl WindowGroupsVisitor {
|
||||
fn new(agg: Aggregate, every: WindowDuration, offset: WindowDuration) -> Self {
|
||||
Self {
|
||||
agg,
|
||||
every,
|
||||
offset,
|
||||
plans: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Visitor for WindowGroupsVisitor {
|
||||
fn pre_visit_table(
|
||||
&mut self,
|
||||
table: &Table,
|
||||
partition: &Partition,
|
||||
filter: &mut PartitionTableFilter,
|
||||
) -> Result<()> {
|
||||
self.plans.push(table.window_grouped_series_set_plan(
|
||||
filter.partition_predicate(),
|
||||
&self.agg,
|
||||
&self.every,
|
||||
&self.offset,
|
||||
partition,
|
||||
)?);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// partition_key returns the partition key for the given line. The key will be the prefix of a
|
||||
// partition name (multiple partitions can exist for each key). It uses the user defined
|
||||
// partitioning rules to construct this key
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
use generated_types::wal as wb;
|
||||
use query::exec::make_window_bound_expr;
|
||||
use query::exec::{make_schema_pivot, GroupedSeriesSetPlan, SeriesSetPlan};
|
||||
use query::group_by::{Aggregate, WindowDuration};
|
||||
use tracing::debug;
|
||||
|
||||
use std::{collections::BTreeSet, collections::HashMap, sync::Arc};
|
||||
|
@ -21,10 +23,10 @@ use arrow_deps::{
|
|||
datatypes::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema},
|
||||
record_batch::RecordBatch,
|
||||
},
|
||||
datafusion,
|
||||
datafusion::logical_plan::Expr,
|
||||
datafusion::logical_plan::LogicalPlan,
|
||||
datafusion::logical_plan::LogicalPlanBuilder,
|
||||
datafusion::{
|
||||
self,
|
||||
logical_plan::{col, Expr, LogicalPlan, LogicalPlanBuilder},
|
||||
},
|
||||
};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
|
@ -160,6 +162,9 @@ pub enum Error {
|
|||
all_tag_column_names: String,
|
||||
},
|
||||
|
||||
#[snafu(display("Error creating aggregate expression: {}", source))]
|
||||
CreatingAggregates { source: query::group_by::Error },
|
||||
|
||||
#[snafu(display("Duplicate group column '{}'", column_name))]
|
||||
DuplicateGroupColumn { column_name: String },
|
||||
}
|
||||
|
@ -452,14 +457,6 @@ impl Table {
|
|||
prefix_columns: Option<&[String]>,
|
||||
partition: &Partition,
|
||||
) -> Result<SeriesSetPlan> {
|
||||
// I wonder if all this string creation will be too slow?
|
||||
let table_name = partition
|
||||
.dictionary
|
||||
.lookup_id(self.id)
|
||||
.expect("looking up table name in dictionary")
|
||||
.to_string();
|
||||
|
||||
let table_name = Arc::new(table_name);
|
||||
let (mut tag_columns, field_columns) =
|
||||
self.tag_and_field_column_names(partition_predicate, partition)?;
|
||||
|
||||
|
@ -507,13 +504,25 @@ impl Table {
|
|||
let plan = plan_builder.build().context(BuildingPlan)?;
|
||||
|
||||
Ok(SeriesSetPlan {
|
||||
table_name,
|
||||
table_name: self.table_name(partition),
|
||||
plan,
|
||||
tag_columns,
|
||||
field_columns,
|
||||
})
|
||||
}
|
||||
|
||||
/// Look up this table's name as a string
|
||||
fn table_name(&self, partition: &Partition) -> Arc<String> {
|
||||
// I wonder if all this string creation will be too slow?
|
||||
let table_name = partition
|
||||
.dictionary
|
||||
.lookup_id(self.id)
|
||||
.expect("looking up table name in dictionary")
|
||||
.to_string();
|
||||
|
||||
Arc::new(table_name)
|
||||
}
|
||||
|
||||
/// Creates a GroupedSeriesSet plan that produces an output table with rows that match the predicate
|
||||
///
|
||||
/// The output looks like:
|
||||
|
@ -547,6 +556,112 @@ impl Table {
|
|||
})
|
||||
}
|
||||
|
||||
/// Creates a GroupedSeriesSet plan that produces an output table with rows
|
||||
/// that are grouped by window defintions
|
||||
///
|
||||
/// The order of the tag_columns
|
||||
///
|
||||
/// The data is sorted on tag_col1, tag_col2, ...) so that all
|
||||
/// rows for a particular series (groups where all tags are the
|
||||
/// same) occur together in the plan
|
||||
///
|
||||
/// Equivalent to this SQL query
|
||||
///
|
||||
/// SELECT tag1, ... tagN,
|
||||
/// window_bound(time, every, offset) as time,
|
||||
/// agg_function1(field), as field_name
|
||||
/// FROM measurement
|
||||
/// GROUP BY
|
||||
/// tag1, ... tagN,
|
||||
/// window_bound(time, every, offset) as time,
|
||||
/// ORDER BY
|
||||
/// tag1, ... tagN,
|
||||
/// window_bound(time, every, offset) as time
|
||||
///
|
||||
/// The created plan looks like:
|
||||
///
|
||||
/// OrderBy(gby: tag columns, window_function; agg: aggregate(field)
|
||||
/// GroupBy(gby: tag columns, window_function; agg: aggregate(field)
|
||||
/// Filter(predicate)
|
||||
/// InMemoryScan
|
||||
///
|
||||
pub fn window_grouped_series_set_plan(
|
||||
&self,
|
||||
partition_predicate: &PartitionPredicate,
|
||||
agg: &Aggregate,
|
||||
every: &WindowDuration,
|
||||
offset: &WindowDuration,
|
||||
partition: &Partition,
|
||||
) -> Result<GroupedSeriesSetPlan> {
|
||||
let (tag_columns, field_columns) =
|
||||
self.tag_and_field_column_names(partition_predicate, partition)?;
|
||||
|
||||
// TODO avoid materializing all the columns here (ideally
|
||||
// DataFusion can prune some of them out)
|
||||
let data = self.all_to_arrow(partition)?;
|
||||
|
||||
let schema = data.schema();
|
||||
|
||||
let projection = None;
|
||||
let projected_schema = schema.clone();
|
||||
|
||||
// And build the plan from the bottom up
|
||||
let plan_builder = LogicalPlanBuilder::from(&LogicalPlan::InMemoryScan {
|
||||
data: vec![vec![data]],
|
||||
schema,
|
||||
projection,
|
||||
projected_schema,
|
||||
});
|
||||
|
||||
// Filtering
|
||||
let plan_builder = Self::add_datafusion_predicate(plan_builder, partition_predicate)?;
|
||||
|
||||
// Group by all tag columns and the window bounds
|
||||
let mut group_exprs = tag_columns
|
||||
.iter()
|
||||
.map(|tag_name| col(tag_name.as_ref()))
|
||||
.collect::<Vec<_>>();
|
||||
// add window_bound() call
|
||||
let window_bound =
|
||||
make_window_bound_expr(col(TIME_COLUMN_NAME), every, offset).alias(TIME_COLUMN_NAME);
|
||||
group_exprs.push(window_bound);
|
||||
|
||||
// aggregate each field
|
||||
let agg_exprs = field_columns
|
||||
.iter()
|
||||
.map(|field_name| {
|
||||
agg.to_datafusion_expr(col(field_name.as_ref()))
|
||||
.context(CreatingAggregates)
|
||||
.map(|agg| agg.alias(field_name.as_ref()))
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
// sort by the group by expressions as well
|
||||
let sort_exprs = group_exprs
|
||||
.iter()
|
||||
.map(|expr| expr.into_sort_expr())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let plan_builder = plan_builder
|
||||
.aggregate(group_exprs, agg_exprs)
|
||||
.context(BuildingPlan)?
|
||||
.sort(sort_exprs)
|
||||
.context(BuildingPlan)?;
|
||||
|
||||
// and finally create the plan
|
||||
let plan = plan_builder.build().context(BuildingPlan)?;
|
||||
|
||||
Ok(GroupedSeriesSetPlan {
|
||||
num_prefix_tag_group_columns: tag_columns.len(),
|
||||
series_set_plan: SeriesSetPlan {
|
||||
table_name: self.table_name(partition),
|
||||
plan,
|
||||
tag_columns,
|
||||
field_columns,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
/// Creates a plan that produces an output table with rows that
|
||||
/// match the predicate for all fields in the table.
|
||||
///
|
||||
|
@ -598,8 +713,8 @@ impl Table {
|
|||
}
|
||||
|
||||
// Returns (tag_columns, field_columns) vectors with the names of
|
||||
// all tag and field columns, respectively. The vectors are sorted
|
||||
// by name.
|
||||
// all tag and field columns, respectively, after any predicates
|
||||
// have been applied. The vectors are sorted by lexically by name.
|
||||
fn tag_and_field_column_names(
|
||||
&self,
|
||||
partition_predicate: &PartitionPredicate,
|
||||
|
@ -1016,9 +1131,17 @@ impl IntoExpr for str {
|
|||
}
|
||||
}
|
||||
|
||||
impl IntoExpr for Expr {
|
||||
fn into_expr(&self) -> Expr {
|
||||
self.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use arrow::util::pretty::pretty_format_batches;
|
||||
use arrow_deps::datafusion::logical_plan::{binary_expr, col, lit};
|
||||
use data_types::data::split_lines_into_write_entry_partitions;
|
||||
use datafusion::{logical_plan::Operator, scalar::ScalarValue};
|
||||
use influxdb_line_protocol::{parse_lines, ParsedLine};
|
||||
|
@ -1029,7 +1152,6 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_has_columns() {
|
||||
// setup a test table
|
||||
let mut partition = Partition::new("dummy_partition_key");
|
||||
let dictionary = &mut partition.dictionary;
|
||||
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
|
||||
|
@ -1072,7 +1194,6 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_matches_table_name_predicate() {
|
||||
// setup a test table
|
||||
let mut partition = Partition::new("dummy_partition_key");
|
||||
let dictionary = &mut partition.dictionary;
|
||||
let mut table = Table::new(dictionary.lookup_value_or_insert("h2o"));
|
||||
|
@ -1103,7 +1224,6 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_series_set_plan() {
|
||||
// setup a test table
|
||||
let mut partition = Partition::new("dummy_partition_key");
|
||||
let dictionary = &mut partition.dictionary;
|
||||
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
|
||||
|
@ -1153,7 +1273,6 @@ mod tests {
|
|||
async fn test_series_set_plan_order() {
|
||||
// test that the columns and rows come out in the right order (tags then timestamp)
|
||||
|
||||
// setup a test table
|
||||
let mut partition = Partition::new("dummy_partition_key");
|
||||
let dictionary = &mut partition.dictionary;
|
||||
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
|
||||
|
@ -1206,7 +1325,6 @@ mod tests {
|
|||
async fn test_series_set_plan_filter() {
|
||||
// test that filters are applied reasonably
|
||||
|
||||
// setup a test table
|
||||
let mut partition = Partition::new("dummy_partition_key");
|
||||
let dictionary = &mut partition.dictionary;
|
||||
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
|
||||
|
@ -1261,9 +1379,6 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_grouped_series_set_plan() {
|
||||
// test that filters are applied reasonably
|
||||
|
||||
// setup a test table
|
||||
let mut partition = Partition::new("dummy_partition_key");
|
||||
let dictionary = &mut partition.dictionary;
|
||||
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
|
||||
|
@ -1309,9 +1424,131 @@ mod tests {
|
|||
assert_eq!(expected, results, "expected output");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_grouped_window_series_set_plan_nanoseconds() {
|
||||
let mut partition = Partition::new("dummy_partition_key");
|
||||
let dictionary = &mut partition.dictionary;
|
||||
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
|
||||
|
||||
let lp_lines = vec![
|
||||
"h2o,state=MA,city=Boston temp=70.0 100",
|
||||
"h2o,state=MA,city=Boston temp=71.0 200",
|
||||
"h2o,state=MA,city=Boston temp=72.0 300",
|
||||
"h2o,state=MA,city=Boston temp=73.0 400",
|
||||
"h2o,state=MA,city=Boston temp=74.0 500",
|
||||
"h2o,state=MA,city=Cambridge temp=80.0 100",
|
||||
"h2o,state=MA,city=Cambridge temp=81.0 200",
|
||||
"h2o,state=MA,city=Cambridge temp=82.0 300",
|
||||
"h2o,state=MA,city=Cambridge temp=83.0 400",
|
||||
"h2o,state=MA,city=Cambridge temp=84.0 500",
|
||||
"h2o,state=CA,city=LA temp=90.0 100",
|
||||
"h2o,state=CA,city=LA temp=91.0 200",
|
||||
"h2o,state=CA,city=LA temp=92.0 300",
|
||||
"h2o,state=CA,city=LA temp=93.0 400",
|
||||
"h2o,state=CA,city=LA temp=94.0 500",
|
||||
];
|
||||
|
||||
write_lines_to_table(&mut table, dictionary, lp_lines);
|
||||
|
||||
let predicate = PredicateBuilder::default()
|
||||
.add_expr(or(
|
||||
binary_expr(col("city"), Operator::Eq, lit("Boston")),
|
||||
binary_expr(col("city"), Operator::Eq, lit("LA")),
|
||||
))
|
||||
.timestamp_range(100, 450)
|
||||
.build();
|
||||
let partition_predicate = partition.compile_predicate(&predicate).unwrap();
|
||||
|
||||
let agg = Aggregate::Mean;
|
||||
let every = WindowDuration::from_nanoseconds(200);
|
||||
let offset = WindowDuration::from_nanoseconds(0);
|
||||
|
||||
let plan = table
|
||||
.window_grouped_series_set_plan(&partition_predicate, &agg, &every, &offset, &partition)
|
||||
.expect("creating the grouped_series set plan");
|
||||
|
||||
assert_eq!(
|
||||
plan.series_set_plan.tag_columns,
|
||||
*str_vec_to_arc_vec(&["city", "state"])
|
||||
);
|
||||
assert_eq!(
|
||||
plan.series_set_plan.field_columns,
|
||||
*str_vec_to_arc_vec(&["temp"])
|
||||
);
|
||||
|
||||
// run the created plan, ensuring the output is as expected
|
||||
let results = run_plan(plan.series_set_plan.plan).await;
|
||||
|
||||
// note the name of the field is "temp" even though it is the average
|
||||
let expected = vec![
|
||||
"+--------+-------+------+------+",
|
||||
"| city | state | time | temp |",
|
||||
"+--------+-------+------+------+",
|
||||
"| Boston | MA | 200 | 70 |",
|
||||
"| Boston | MA | 400 | 71.5 |",
|
||||
"| Boston | MA | 600 | 73 |",
|
||||
"| LA | CA | 200 | 90 |",
|
||||
"| LA | CA | 400 | 91.5 |",
|
||||
"| LA | CA | 600 | 93 |",
|
||||
"+--------+-------+------+------+",
|
||||
];
|
||||
|
||||
assert_eq!(expected, results, "expected output");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_grouped_window_series_set_plan_months() {
|
||||
let mut partition = Partition::new("dummy_partition_key");
|
||||
let dictionary = &mut partition.dictionary;
|
||||
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
|
||||
|
||||
let lp_lines = vec![
|
||||
"h2o,state=MA,city=Boston temp=70.0 1583020800000000000", // 2020-03-01T00:00:00Z
|
||||
"h2o,state=MA,city=Boston temp=71.0 1583107920000000000", // 2020-03-02T00:12:00Z
|
||||
"h2o,state=MA,city=Boston temp=72.0 1585699200000000000", // 2020-04-01T00:00:00Z
|
||||
"h2o,state=MA,city=Boston temp=73.0 1585785600000000000", // 2020-04-02T00:00:00Z
|
||||
];
|
||||
|
||||
write_lines_to_table(&mut table, dictionary, lp_lines);
|
||||
|
||||
let predicate = PredicateBuilder::default().build();
|
||||
let partition_predicate = partition.compile_predicate(&predicate).unwrap();
|
||||
|
||||
let agg = Aggregate::Mean;
|
||||
let every = WindowDuration::from_months(1, false);
|
||||
let offset = WindowDuration::from_months(0, false);
|
||||
|
||||
let plan = table
|
||||
.window_grouped_series_set_plan(&partition_predicate, &agg, &every, &offset, &partition)
|
||||
.expect("creating the grouped_series set plan");
|
||||
|
||||
assert_eq!(
|
||||
plan.series_set_plan.tag_columns,
|
||||
*str_vec_to_arc_vec(&["city", "state"])
|
||||
);
|
||||
assert_eq!(
|
||||
plan.series_set_plan.field_columns,
|
||||
*str_vec_to_arc_vec(&["temp"])
|
||||
);
|
||||
|
||||
// run the created plan, ensuring the output is as expected
|
||||
let results = run_plan(plan.series_set_plan.plan).await;
|
||||
|
||||
// note the name of the field is "temp" even though it is the average
|
||||
let expected = vec![
|
||||
"+--------+-------+---------------------+------+",
|
||||
"| city | state | time | temp |",
|
||||
"+--------+-------+---------------------+------+",
|
||||
"| Boston | MA | 1585699200000000000 | 70.5 |",
|
||||
"| Boston | MA | 1588291200000000000 | 72.5 |",
|
||||
"+--------+-------+---------------------+------+",
|
||||
];
|
||||
|
||||
assert_eq!(expected, results, "expected output");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_field_name_plan() {
|
||||
// setup a test table
|
||||
let mut partition = Partition::new("dummy_partition_key");
|
||||
let dictionary = &mut partition.dictionary;
|
||||
let mut table = Table::new(dictionary.lookup_value_or_insert("table_name"));
|
||||
|
@ -1478,4 +1715,13 @@ mod tests {
|
|||
fn partition_key_func(_: &ParsedLine<'_>) -> String {
|
||||
String::from("the_partition_key")
|
||||
}
|
||||
|
||||
/// return a new expression with a logical OR
|
||||
fn or(left: Expr, right: Expr) -> Expr {
|
||||
Expr::BinaryExpr {
|
||||
left: Box::new(left),
|
||||
op: Operator::Or,
|
||||
right: Box::new(right),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue