parent
e1825ec45b
commit
2cfb30d5df
|
@ -69,9 +69,7 @@ use itertools::Itertools;
|
|||
use observability_deps::tracing::debug;
|
||||
use query_functions::{
|
||||
clean_non_meta_escapes,
|
||||
selectors::{
|
||||
struct_selector_first, struct_selector_last, struct_selector_max, struct_selector_min,
|
||||
},
|
||||
selectors::{selector_first, selector_last, selector_max, selector_min},
|
||||
};
|
||||
use schema::{
|
||||
InfluxColumnType, InfluxFieldType, Schema, INFLUXQL_MEASUREMENT_COLUMN_NAME,
|
||||
|
@ -1175,10 +1173,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
|||
}
|
||||
|
||||
let selector_udf = match name {
|
||||
"first" => struct_selector_first(),
|
||||
"last" => struct_selector_last(),
|
||||
"max" => struct_selector_max(),
|
||||
"min" => struct_selector_min(),
|
||||
"first" => selector_first(),
|
||||
"last" => selector_last(),
|
||||
"max" => selector_max(),
|
||||
"min" => selector_min(),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
.call(vec![expr, "time".as_expr()]);
|
||||
|
|
|
@ -38,9 +38,7 @@ use predicate::{
|
|||
use query_functions::{
|
||||
group_by::{Aggregate, WindowDuration},
|
||||
make_window_bound_expr,
|
||||
selectors::{
|
||||
struct_selector_first, struct_selector_last, struct_selector_max, struct_selector_min,
|
||||
},
|
||||
selectors::{selector_first, selector_last, selector_max, selector_min},
|
||||
};
|
||||
use schema::{InfluxColumnType, Projection, Schema, TIME_COLUMN_NAME};
|
||||
use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
||||
|
@ -1873,10 +1871,10 @@ fn make_agg_expr(agg: Aggregate, field_expr: FieldExpr<'_>) -> Result<Expr> {
|
|||
///
|
||||
fn make_selector_expr(agg: Aggregate, field: FieldExpr<'_>) -> Result<Expr> {
|
||||
let uda = match agg {
|
||||
Aggregate::First => struct_selector_first(),
|
||||
Aggregate::Last => struct_selector_last(),
|
||||
Aggregate::Min => struct_selector_min(),
|
||||
Aggregate::Max => struct_selector_max(),
|
||||
Aggregate::First => selector_first(),
|
||||
Aggregate::Last => selector_last(),
|
||||
Aggregate::Min => selector_min(),
|
||||
Aggregate::Max => selector_max(),
|
||||
_ => return InternalAggregateNotSelectorSnafu { agg }.fail(),
|
||||
};
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -17,13 +17,13 @@ use arrow::{
|
|||
max as array_max, max_boolean as array_max_boolean, max_string as array_max_string,
|
||||
min as array_min, min_boolean as array_min_boolean, min_string as array_min_string,
|
||||
},
|
||||
datatypes::{DataType, Field, Fields},
|
||||
datatypes::{Field, Fields},
|
||||
};
|
||||
use datafusion::{error::Result as DataFusionResult, scalar::ScalarValue};
|
||||
|
||||
use observability_deps::tracing::debug;
|
||||
|
||||
use super::{Selector, SelectorOutput};
|
||||
use super::Selector;
|
||||
|
||||
/// Trait for comparing values in arrays with their native
|
||||
/// representation. This so the same comparison expression can be used
|
||||
|
@ -117,7 +117,7 @@ fn make_scalar_struct(data_fields: Vec<ScalarValue>) -> ScalarValue {
|
|||
}
|
||||
|
||||
macro_rules! make_first_selector {
|
||||
($STRUCTNAME:ident, $RUSTTYPE:ident, $ARROWTYPE:expr, $ARRTYPE:ident, $MINFUNC:ident, $TO_SCALARVALUE: expr) => {
|
||||
($STRUCTNAME:ident, $RUSTTYPE:ident, $ARRTYPE:ident, $MINFUNC:ident, $TO_SCALARVALUE: expr) => {
|
||||
#[derive(Debug)]
|
||||
pub struct $STRUCTNAME {
|
||||
value: Option<$RUSTTYPE>,
|
||||
|
@ -134,10 +134,6 @@ macro_rules! make_first_selector {
|
|||
}
|
||||
|
||||
impl Selector for $STRUCTNAME {
|
||||
fn value_data_type() -> DataType {
|
||||
$ARROWTYPE
|
||||
}
|
||||
|
||||
fn datafusion_state(&self) -> DataFusionResult<Vec<ScalarValue>> {
|
||||
Ok(vec![
|
||||
$TO_SCALARVALUE(self.value.clone()),
|
||||
|
@ -145,15 +141,11 @@ macro_rules! make_first_selector {
|
|||
])
|
||||
}
|
||||
|
||||
fn evaluate(&self, output: &SelectorOutput) -> DataFusionResult<ScalarValue> {
|
||||
match output {
|
||||
SelectorOutput::Value => Ok($TO_SCALARVALUE(self.value.clone())),
|
||||
SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time, None)),
|
||||
SelectorOutput::Struct => Ok(make_scalar_struct(vec![
|
||||
$TO_SCALARVALUE(self.value.clone()),
|
||||
ScalarValue::TimestampNanosecond(self.time, None),
|
||||
])),
|
||||
}
|
||||
fn evaluate(&self) -> DataFusionResult<ScalarValue> {
|
||||
Ok(make_scalar_struct(vec![
|
||||
$TO_SCALARVALUE(self.value.clone()),
|
||||
ScalarValue::TimestampNanosecond(self.time, None),
|
||||
]))
|
||||
}
|
||||
|
||||
fn update_batch(
|
||||
|
@ -233,7 +225,7 @@ macro_rules! make_first_selector {
|
|||
}
|
||||
|
||||
macro_rules! make_last_selector {
|
||||
($STRUCTNAME:ident, $RUSTTYPE:ident, $ARROWTYPE:expr, $ARRTYPE:ident, $MAXFUNC:ident, $TO_SCALARVALUE: expr) => {
|
||||
($STRUCTNAME:ident, $RUSTTYPE:ident, $ARRTYPE:ident, $MAXFUNC:ident, $TO_SCALARVALUE: expr) => {
|
||||
#[derive(Debug)]
|
||||
pub struct $STRUCTNAME {
|
||||
value: Option<$RUSTTYPE>,
|
||||
|
@ -250,10 +242,6 @@ macro_rules! make_last_selector {
|
|||
}
|
||||
|
||||
impl Selector for $STRUCTNAME {
|
||||
fn value_data_type() -> DataType {
|
||||
$ARROWTYPE
|
||||
}
|
||||
|
||||
fn datafusion_state(&self) -> DataFusionResult<Vec<ScalarValue>> {
|
||||
Ok(vec![
|
||||
$TO_SCALARVALUE(self.value.clone()),
|
||||
|
@ -261,15 +249,11 @@ macro_rules! make_last_selector {
|
|||
])
|
||||
}
|
||||
|
||||
fn evaluate(&self, output: &SelectorOutput) -> DataFusionResult<ScalarValue> {
|
||||
match output {
|
||||
SelectorOutput::Value => Ok($TO_SCALARVALUE(self.value.clone())),
|
||||
SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time, None)),
|
||||
SelectorOutput::Struct => Ok(make_scalar_struct(vec![
|
||||
$TO_SCALARVALUE(self.value.clone()),
|
||||
ScalarValue::TimestampNanosecond(self.time, None),
|
||||
])),
|
||||
}
|
||||
fn evaluate(&self) -> DataFusionResult<ScalarValue> {
|
||||
Ok(make_scalar_struct(vec![
|
||||
$TO_SCALARVALUE(self.value.clone()),
|
||||
ScalarValue::TimestampNanosecond(self.time, None),
|
||||
]))
|
||||
}
|
||||
|
||||
fn update_batch(
|
||||
|
@ -373,7 +357,7 @@ impl ActionNeeded {
|
|||
}
|
||||
|
||||
macro_rules! make_min_selector {
|
||||
($STRUCTNAME:ident, $RUSTTYPE:ident, $ARROWTYPE:expr, $ARRTYPE:ident, $MINFUNC:ident, $TO_SCALARVALUE: expr) => {
|
||||
($STRUCTNAME:ident, $RUSTTYPE:ident, $ARRTYPE:ident, $MINFUNC:ident, $TO_SCALARVALUE: expr) => {
|
||||
#[derive(Debug)]
|
||||
pub struct $STRUCTNAME {
|
||||
value: Option<$RUSTTYPE>,
|
||||
|
@ -390,10 +374,6 @@ macro_rules! make_min_selector {
|
|||
}
|
||||
|
||||
impl Selector for $STRUCTNAME {
|
||||
fn value_data_type() -> DataType {
|
||||
$ARROWTYPE
|
||||
}
|
||||
|
||||
fn datafusion_state(&self) -> DataFusionResult<Vec<ScalarValue>> {
|
||||
Ok(vec![
|
||||
$TO_SCALARVALUE(self.value.clone()),
|
||||
|
@ -401,15 +381,11 @@ macro_rules! make_min_selector {
|
|||
])
|
||||
}
|
||||
|
||||
fn evaluate(&self, output: &SelectorOutput) -> DataFusionResult<ScalarValue> {
|
||||
match output {
|
||||
SelectorOutput::Value => Ok($TO_SCALARVALUE(self.value.clone())),
|
||||
SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time, None)),
|
||||
SelectorOutput::Struct => Ok(make_scalar_struct(vec![
|
||||
$TO_SCALARVALUE(self.value.clone()),
|
||||
ScalarValue::TimestampNanosecond(self.time, None),
|
||||
])),
|
||||
}
|
||||
fn evaluate(&self) -> DataFusionResult<ScalarValue> {
|
||||
Ok(make_scalar_struct(vec![
|
||||
$TO_SCALARVALUE(self.value.clone()),
|
||||
ScalarValue::TimestampNanosecond(self.time, None),
|
||||
]))
|
||||
}
|
||||
|
||||
fn update_batch(
|
||||
|
@ -494,7 +470,7 @@ macro_rules! make_min_selector {
|
|||
}
|
||||
|
||||
macro_rules! make_max_selector {
|
||||
($STRUCTNAME:ident, $RUSTTYPE:ident, $ARROWTYPE:expr, $ARRTYPE:ident, $MAXFUNC:ident, $TO_SCALARVALUE: expr) => {
|
||||
($STRUCTNAME:ident, $RUSTTYPE:ident, $ARRTYPE:ident, $MAXFUNC:ident, $TO_SCALARVALUE: expr) => {
|
||||
#[derive(Debug)]
|
||||
pub struct $STRUCTNAME {
|
||||
value: Option<$RUSTTYPE>,
|
||||
|
@ -511,10 +487,6 @@ macro_rules! make_max_selector {
|
|||
}
|
||||
|
||||
impl Selector for $STRUCTNAME {
|
||||
fn value_data_type() -> DataType {
|
||||
$ARROWTYPE
|
||||
}
|
||||
|
||||
fn datafusion_state(&self) -> DataFusionResult<Vec<ScalarValue>> {
|
||||
Ok(vec![
|
||||
$TO_SCALARVALUE(self.value.clone()),
|
||||
|
@ -522,15 +494,11 @@ macro_rules! make_max_selector {
|
|||
])
|
||||
}
|
||||
|
||||
fn evaluate(&self, output: &SelectorOutput) -> DataFusionResult<ScalarValue> {
|
||||
match output {
|
||||
SelectorOutput::Value => Ok($TO_SCALARVALUE(self.value.clone())),
|
||||
SelectorOutput::Time => Ok(ScalarValue::TimestampNanosecond(self.time, None)),
|
||||
SelectorOutput::Struct => Ok(make_scalar_struct(vec![
|
||||
$TO_SCALARVALUE(self.value.clone()),
|
||||
ScalarValue::TimestampNanosecond(self.time, None),
|
||||
])),
|
||||
}
|
||||
fn evaluate(&self) -> DataFusionResult<ScalarValue> {
|
||||
Ok(make_scalar_struct(vec![
|
||||
$TO_SCALARVALUE(self.value.clone()),
|
||||
ScalarValue::TimestampNanosecond(self.time, None),
|
||||
]))
|
||||
}
|
||||
|
||||
fn update_batch(
|
||||
|
@ -620,7 +588,6 @@ macro_rules! make_max_selector {
|
|||
make_first_selector!(
|
||||
F64FirstSelector,
|
||||
f64,
|
||||
DataType::Float64,
|
||||
Float64Array,
|
||||
array_min,
|
||||
ScalarValue::Float64
|
||||
|
@ -628,7 +595,6 @@ make_first_selector!(
|
|||
make_first_selector!(
|
||||
I64FirstSelector,
|
||||
i64,
|
||||
DataType::Int64,
|
||||
Int64Array,
|
||||
array_min,
|
||||
ScalarValue::Int64
|
||||
|
@ -636,7 +602,6 @@ make_first_selector!(
|
|||
make_first_selector!(
|
||||
U64FirstSelector,
|
||||
u64,
|
||||
DataType::UInt64,
|
||||
UInt64Array,
|
||||
array_min,
|
||||
ScalarValue::UInt64
|
||||
|
@ -644,7 +609,6 @@ make_first_selector!(
|
|||
make_first_selector!(
|
||||
Utf8FirstSelector,
|
||||
String,
|
||||
DataType::Utf8,
|
||||
StringArray,
|
||||
array_min,
|
||||
ScalarValue::Utf8
|
||||
|
@ -652,7 +616,6 @@ make_first_selector!(
|
|||
make_first_selector!(
|
||||
BooleanFirstSelector,
|
||||
bool,
|
||||
DataType::Boolean,
|
||||
BooleanArray,
|
||||
array_min,
|
||||
ScalarValue::Boolean
|
||||
|
@ -663,7 +626,6 @@ make_first_selector!(
|
|||
make_last_selector!(
|
||||
F64LastSelector,
|
||||
f64,
|
||||
DataType::Float64,
|
||||
Float64Array,
|
||||
array_max,
|
||||
ScalarValue::Float64
|
||||
|
@ -671,7 +633,6 @@ make_last_selector!(
|
|||
make_last_selector!(
|
||||
I64LastSelector,
|
||||
i64,
|
||||
DataType::Int64,
|
||||
Int64Array,
|
||||
array_max,
|
||||
ScalarValue::Int64
|
||||
|
@ -679,7 +640,6 @@ make_last_selector!(
|
|||
make_last_selector!(
|
||||
U64LastSelector,
|
||||
u64,
|
||||
DataType::UInt64,
|
||||
UInt64Array,
|
||||
array_max,
|
||||
ScalarValue::UInt64
|
||||
|
@ -687,7 +647,6 @@ make_last_selector!(
|
|||
make_last_selector!(
|
||||
Utf8LastSelector,
|
||||
String,
|
||||
DataType::Utf8,
|
||||
StringArray,
|
||||
array_max,
|
||||
ScalarValue::Utf8
|
||||
|
@ -695,7 +654,6 @@ make_last_selector!(
|
|||
make_last_selector!(
|
||||
BooleanLastSelector,
|
||||
bool,
|
||||
DataType::Boolean,
|
||||
BooleanArray,
|
||||
array_max,
|
||||
ScalarValue::Boolean
|
||||
|
@ -706,7 +664,6 @@ make_last_selector!(
|
|||
make_min_selector!(
|
||||
F64MinSelector,
|
||||
f64,
|
||||
DataType::Float64,
|
||||
Float64Array,
|
||||
array_min,
|
||||
ScalarValue::Float64
|
||||
|
@ -714,7 +671,6 @@ make_min_selector!(
|
|||
make_min_selector!(
|
||||
I64MinSelector,
|
||||
i64,
|
||||
DataType::Int64,
|
||||
Int64Array,
|
||||
array_min,
|
||||
ScalarValue::Int64
|
||||
|
@ -722,7 +678,6 @@ make_min_selector!(
|
|||
make_min_selector!(
|
||||
U64MinSelector,
|
||||
u64,
|
||||
DataType::UInt64,
|
||||
UInt64Array,
|
||||
array_min,
|
||||
ScalarValue::UInt64
|
||||
|
@ -730,7 +685,6 @@ make_min_selector!(
|
|||
make_min_selector!(
|
||||
Utf8MinSelector,
|
||||
String,
|
||||
DataType::Utf8,
|
||||
StringArray,
|
||||
array_min_string,
|
||||
ScalarValue::Utf8
|
||||
|
@ -738,7 +692,6 @@ make_min_selector!(
|
|||
make_min_selector!(
|
||||
BooleanMinSelector,
|
||||
bool,
|
||||
DataType::Boolean,
|
||||
BooleanArray,
|
||||
array_min_boolean,
|
||||
ScalarValue::Boolean
|
||||
|
@ -749,7 +702,6 @@ make_min_selector!(
|
|||
make_max_selector!(
|
||||
F64MaxSelector,
|
||||
f64,
|
||||
DataType::Float64,
|
||||
Float64Array,
|
||||
array_max,
|
||||
ScalarValue::Float64
|
||||
|
@ -757,7 +709,6 @@ make_max_selector!(
|
|||
make_max_selector!(
|
||||
I64MaxSelector,
|
||||
i64,
|
||||
DataType::Int64,
|
||||
Int64Array,
|
||||
array_max,
|
||||
ScalarValue::Int64
|
||||
|
@ -765,7 +716,6 @@ make_max_selector!(
|
|||
make_max_selector!(
|
||||
U64MaxSelector,
|
||||
u64,
|
||||
DataType::UInt64,
|
||||
UInt64Array,
|
||||
array_max,
|
||||
ScalarValue::UInt64
|
||||
|
@ -773,7 +723,6 @@ make_max_selector!(
|
|||
make_max_selector!(
|
||||
Utf8MaxSelector,
|
||||
String,
|
||||
DataType::Utf8,
|
||||
StringArray,
|
||||
array_max_string,
|
||||
ScalarValue::Utf8
|
||||
|
@ -781,7 +730,6 @@ make_max_selector!(
|
|||
make_max_selector!(
|
||||
BooleanMaxSelector,
|
||||
bool,
|
||||
DataType::Boolean,
|
||||
BooleanArray,
|
||||
array_max_boolean,
|
||||
ScalarValue::Boolean
|
||||
|
|
Loading…
Reference in New Issue