diff --git a/iox_query_influxql/src/plan/expr_type_evaluator.rs b/iox_query_influxql/src/plan/expr_type_evaluator.rs index 82e9d3455e..fa5b1e2bc8 100644 --- a/iox_query_influxql/src/plan/expr_type_evaluator.rs +++ b/iox_query_influxql/src/plan/expr_type_evaluator.rs @@ -1,7 +1,7 @@ use crate::plan::field::field_by_name; use crate::plan::field_mapper::map_type; -use crate::plan::SchemaProvider; -use datafusion::common::{DataFusionError, Result}; +use crate::plan::{error, SchemaProvider}; +use datafusion::common::Result; use influxdb_influxql_parser::common::{MeasurementName, QualifiedMeasurementName}; use influxdb_influxql_parser::expression::{Call, Expr, VarRef, VarRefDataType}; use influxdb_influxql_parser::literal::Literal; @@ -113,9 +113,7 @@ impl<'a> TypeEvaluator<'a> { } } _ => { - return Err(DataFusionError::Internal( - "eval_var_ref: Unexpected MeasurementSelection".to_string(), - )) + return error::internal("eval_var_ref: Unexpected MeasurementSelection") } } } diff --git a/iox_query_influxql/src/plan/field_mapper.rs b/iox_query_influxql/src/plan/field_mapper.rs index 3fdc8398e9..7b96ca8a98 100644 --- a/iox_query_influxql/src/plan/field_mapper.rs +++ b/iox_query_influxql/src/plan/field_mapper.rs @@ -1,5 +1,3 @@ -#![allow(dead_code)] - use crate::plan::var_ref::field_type_to_var_ref_data_type; use crate::plan::SchemaProvider; use datafusion::common::Result; diff --git a/iox_query_influxql/src/plan/planner.rs b/iox_query_influxql/src/plan/planner.rs index 2b732c7a5d..a5b06d02a5 100644 --- a/iox_query_influxql/src/plan/planner.rs +++ b/iox_query_influxql/src/plan/planner.rs @@ -20,7 +20,7 @@ use arrow::record_batch::RecordBatch; use chrono_tz::Tz; use datafusion::catalog::TableReference; use datafusion::common::tree_node::{TreeNode, TreeNodeRewriter}; -use datafusion::common::{DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, ToDFSchema}; +use datafusion::common::{DFSchema, DFSchemaRef, Result, ScalarValue, ToDFSchema}; use datafusion::datasource::{provider_as_source, MemTable}; use datafusion::logical_expr::expr_rewriter::normalize_col; use datafusion::logical_expr::logical_plan::builder::project; @@ -214,9 +214,8 @@ impl<'a> InfluxQLToLogicalPlan<'a> { // grouped into tables in the output when formatted as InfluxQL tabular format. let measurement_column_index = schema .index_of_column_by_name(None, "plan_type")? - .ok_or_else(|| { - DataFusionError::External("internal: unable to find plan_type column".into()) - })? as u32; + .ok_or_else(|| error::map::internal("unable to find plan_type column"))? + as u32; let (analyze, verbose) = match explain.options { Some(ExplainOption::AnalyzeVerbose) => (true, true), @@ -767,11 +766,11 @@ impl<'a> InfluxQLToLogicalPlan<'a> { let limit = limit .map(|v| >::try_into(*v)) .transpose() - .map_err(|_| DataFusionError::Plan("limit out of range".to_owned()))?; + .map_err(|_| error::map::query("limit out of range"))?; let offset = offset .map(|v| >::try_into(*v)) .transpose() - .map_err(|_| DataFusionError::Plan("offset out of range".to_owned()))?; + .map_err(|_| error::map::query("offset out of range".to_owned()))?; // a reference to the ROW_NUMBER column. let row_alias = IOX_ROW_ALIAS.as_expr(); @@ -835,7 +834,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { return Ok(input); } - Err(DataFusionError::NotImplemented("SLIMIT or SOFFSET".into())) + error::not_implemented("SLIMIT or SOFFSET") } /// Map the InfluxQL `SELECT` projection list into a list of DataFusion expressions. @@ -941,9 +940,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { let iox_schema = &schemas.iox_schema; match iql { // rewriter is expected to expand wildcard expressions - IQLExpr::Wildcard(_) => Err(DataFusionError::Internal( - "unexpected wildcard in projection".into(), - )), + IQLExpr::Wildcard(_) => error::internal("unexpected wildcard in projection"), IQLExpr::VarRef(VarRef { name, data_type: opt_dst_type, @@ -989,7 +986,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { }, }) } - IQLExpr::BindParameter(_) => Err(DataFusionError::NotImplemented("parameter".into())), + IQLExpr::BindParameter(_) => error::not_implemented("parameter"), IQLExpr::Literal(val) => match val { Literal::Integer(v) => Ok(lit(*v)), Literal::Unsigned(v) => Ok(lit(*v)), @@ -1000,19 +997,17 @@ impl<'a> InfluxQLToLogicalPlan<'a> { Some(v.timestamp()), None, ))), - Literal::Duration(_) => { - Err(DataFusionError::NotImplemented("duration literal".into())) - } + Literal::Duration(_) => error::not_implemented("duration literal"), Literal::Regex(re) => match ctx.scope { // a regular expression in a projection list is unexpected, // as it should have been expanded by the rewriter. - ExprScope::Projection => Err(DataFusionError::Internal( - "unexpected regular expression found in projection".into(), - )), + ExprScope::Projection => { + error::internal("unexpected regular expression found in projection") + } ExprScope::Where => Ok(lit(clean_non_meta_escapes(re.as_str()))), }, }, - IQLExpr::Distinct(_) => Err(DataFusionError::NotImplemented("DISTINCT".into())), + IQLExpr::Distinct(_) => error::not_implemented("DISTINCT"), IQLExpr::Call(call) => self.call_to_df_expr(ctx, call, schemas), IQLExpr::Binary(expr) => self.arithmetic_expr_to_df_expr(ctx, expr, schemas), IQLExpr::Nested(e) => self.expr_to_df_expr(ctx, e, schemas), @@ -1043,12 +1038,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> { match ctx.scope { ExprScope::Where => { if call.name.eq_ignore_ascii_case("now") { - Err(DataFusionError::NotImplemented("now".into())) + error::not_implemented("now") } else { let name = &call.name; - Err(DataFusionError::External( - format!("invalid function call in condition: {name}").into(), - )) + error::query(format!("invalid function call in condition: {name}")) } } ExprScope::Projection => self.function_to_df_expr(ctx, call, schemas), @@ -1064,9 +1057,9 @@ impl<'a> InfluxQLToLogicalPlan<'a> { fn check_arg_count(name: &str, args: &[IQLExpr], count: usize) -> Result<()> { let got = args.len(); if got != count { - Err(DataFusionError::Plan(format!( + error::query(format!( "invalid number of arguments for {name}: expected {count}, got {got}" - ))) + )) } else { Ok(()) } @@ -1164,9 +1157,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { match BuiltinScalarFunction::from_str(call.name.as_str())? { BuiltinScalarFunction::Log => { if args.len() != 2 { - Err(DataFusionError::Plan( - "invalid number of arguments for log, expected 2, got 1".to_owned(), - )) + error::query("invalid number of arguments for log, expected 2, got 1") } else { Ok(Expr::ScalarFunction { fun: BuiltinScalarFunction::Log, @@ -1233,13 +1224,13 @@ impl<'a> InfluxQLToLogicalPlan<'a> { self.create_table_ref(normalize_identifier(ident)) } // rewriter is expected to expand the regular expression - MeasurementName::Regex(_) => Err(DataFusionError::Internal( - "unexpected regular expression in FROM clause".into(), - )), + MeasurementName::Regex(_) => error::internal( + "unexpected regular expression in FROM clause", + ), }, - MeasurementSelection::Subquery(_) => Err(DataFusionError::NotImplemented( - "subquery in FROM clause".into(), - )), + MeasurementSelection::Subquery(_) => error::not_implemented( + "subquery in FROM clause", + ), }? else { continue }; table_projs.push_back(table_proj); } @@ -1280,14 +1271,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> { let mut out = HashSet::new(); for qualified_name in from.iter() { if qualified_name.database.is_some() { - return Err(DataFusionError::NotImplemented( - "database name in from clause".into(), - )); + return error::not_implemented("database name in from clause"); } if qualified_name.retention_policy.is_some() { - return Err(DataFusionError::NotImplemented( - "retention policy in from clause".into(), - )); + return error::not_implemented("retention policy in from clause"); } match &qualified_name.name { MeasurementName::Name(name) => { @@ -1320,9 +1307,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { ) -> Result { if show_field_keys.database.is_some() { // How do we handle this? Do we need to perform cross-namespace queries here? - return Err(DataFusionError::NotImplemented( - "SHOW FIELD KEYS ON ".into(), - )); + return error::not_implemented("SHOW FIELD KEYS ON "); } let field_key_col = "fieldKey"; @@ -1414,9 +1399,7 @@ fn build_gap_fill_node( // added by the planner. let (stride, time_range, origin) = if date_bin_args.len() == 3 { let time_col = date_bin_args[1].try_into_col().map_err(|_| { - DataFusionError::Internal( - "DATE_BIN requires a column as the source argument".to_string(), - ) + error::map::internal("DATE_BIN requires a column as the source argument") })?; // Ensure that a time range was specified and is valid for gap filling @@ -1443,10 +1426,10 @@ fn build_gap_fill_node( } else { // This is an internal error as the date_bin function is added by the planner and should // always contain the correct number of arguments. - return Err(DataFusionError::Internal(format!( + return error::internal(format!( "DATE_BIN expects 3 arguments, got {}", date_bin_args.len() - ))); + )); }; let aggr = Aggregate::try_from_plan(&input)?; @@ -1487,7 +1470,7 @@ fn build_gap_fill_node( fn plan_with_metadata(plan: LogicalPlan, metadata: &InfluxQlMetadata) -> Result { fn make_schema(schema: DFSchemaRef, metadata: &InfluxQlMetadata) -> Result { let data = serde_json::to_string(metadata).map_err(|err| { - DataFusionError::Internal(format!("error serializing InfluxQL metadata: {err}")) + error::map::internal(format!("error serializing InfluxQL metadata: {err}")) })?; let mut md = schema.metadata().clone(); @@ -1592,11 +1575,7 @@ fn plan_with_metadata(plan: LogicalPlan, metadata: &InfluxQlMetadata) -> Result< t.projected_schema = make_schema(Arc::clone(&src.projected_schema), metadata)?; LogicalPlan::TableScan(t) } - _ => { - return Err(DataFusionError::External( - format!("unexpected LogicalPlan: {}", input.display()).into(), - )) - } + _ => return error::internal(format!("unexpected LogicalPlan: {}", input.display())), }) } @@ -1705,9 +1684,7 @@ fn conditional_op_to_operator(op: ConditionalOperator) -> Result { ConditionalOperator::And => Ok(Operator::And), ConditionalOperator::Or => Ok(Operator::Or), // NOTE: This is not supported by InfluxQL SELECT expressions, so it is unexpected - ConditionalOperator::In => Err(DataFusionError::Internal( - "unexpected binary operator: IN".into(), - )), + ConditionalOperator::In => error::internal("unexpected binary operator: IN"), } } @@ -1815,7 +1792,7 @@ fn is_time_field(cond: &ConditionalExpression) -> bool { fn find_expr(cond: &ConditionalExpression) -> Result<&IQLExpr> { cond.expr() - .ok_or_else(|| DataFusionError::Internal("incomplete conditional expression".into())) + .ok_or_else(|| error::map::internal("incomplete conditional expression")) } #[cfg(test)] diff --git a/iox_query_influxql/src/plan/planner_time_range_expression.rs b/iox_query_influxql/src/plan/planner_time_range_expression.rs index b1cc29be38..06bbed1711 100644 --- a/iox_query_influxql/src/plan/planner_time_range_expression.rs +++ b/iox_query_influxql/src/plan/planner_time_range_expression.rs @@ -1,8 +1,9 @@ +use crate::plan::error; use crate::plan::timestamp::parse_timestamp; use crate::plan::util::binary_operator_to_df_operator; use datafusion::common::{DataFusionError, Result, ScalarValue}; use datafusion::logical_expr::{binary_expr, lit, now, BinaryExpr, Expr as DFExpr, Operator}; -use influxdb_influxql_parser::expression::{Binary, BinaryOperator}; +use influxdb_influxql_parser::expression::{Binary, BinaryOperator, Call}; use influxdb_influxql_parser::{expression::Expr, literal::Literal}; type ExprResult = Result; @@ -62,11 +63,7 @@ pub(in crate::plan) fn time_range_to_df_expr(expr: &Expr, tz: Option { DFExpr::Literal(ScalarValue::TimestampNanosecond(Some(v), None)) } - _ => { - return Err(DataFusionError::Plan( - "invalid time range expression".into(), - )) - } + _ => return error::query("invalid time range expression"), }) } @@ -85,13 +82,13 @@ pub(super) fn duration_expr_to_nanoseconds(expr: &Expr) -> Result { DFExpr::Literal(ScalarValue::IntervalMonthDayNano(Some(v))) => Ok(v as i64), DFExpr::Literal(ScalarValue::Float64(Some(v))) => Ok(v as i64), DFExpr::Literal(ScalarValue::Int64(Some(v))) => Ok(v), - _ => Err(DataFusionError::Plan("invalid duration expression".into())), + _ => error::query("invalid duration expression"), } } fn map_expr_err(expr: &Expr) -> impl Fn(DataFusionError) -> DataFusionError + '_ { move |err| { - DataFusionError::Plan(format!( + error::map::query(format!( "invalid expression \"{}\": {}", expr, match err { @@ -105,11 +102,11 @@ fn map_expr_err(expr: &Expr) -> impl Fn(DataFusionError) -> DataFusionError + '_ fn reduce_expr(expr: &Expr, tz: Option) -> ExprResult { match expr { Expr::Binary(v) => reduce_binary_expr(v, tz).map_err(map_expr_err(expr)), - Expr::Call (v) => { - if !v.name.eq_ignore_ascii_case("now") { - return Err(DataFusionError::Plan( - format!("invalid function call '{}'", v.name), - )); + Expr::Call (Call { name, .. }) => { + if !name.eq_ignore_ascii_case("now") { + return error::query( + format!("invalid function call '{name}'"), + ); } Ok(now()) } @@ -123,14 +120,14 @@ fn reduce_expr(expr: &Expr, tz: Option) -> ExprResult { None, ))), Literal::Duration(v) => Ok(lit(ScalarValue::new_interval_mdn(0, 0, **v))), - _ => Err(DataFusionError::Plan(format!( + _ => error::query(format!( "found literal '{val}', expected duration, float, integer, or timestamp string" - ))), + )), }, - Expr::VarRef { .. } | Expr::BindParameter(_) | Expr::Wildcard(_) | Expr::Distinct(_) => Err(DataFusionError::Plan(format!( + Expr::VarRef { .. } | Expr::BindParameter(_) | Expr::Wildcard(_) | Expr::Distinct(_) => error::query(format!( "found symbol '{expr}', expected now() or a literal duration, float, integer and timestamp string" - ))), + )), } } @@ -190,9 +187,7 @@ fn reduce_binary_lhs_duration_df_expr( BinaryOperator::Sub => { Ok(lit(ScalarValue::new_interval_mdn(0, 0, (lhs - *d) as i64))) } - _ => Err(DataFusionError::Plan(format!( - "found operator '{op}', expected +, -" - ))), + _ => error::query(format!("found operator '{op}', expected +, -")), }, // durations may only be scaled by float literals ScalarValue::Float64(Some(v)) => { @@ -205,9 +200,7 @@ fn reduce_binary_lhs_duration_df_expr( BinaryOperator::Div => { Ok(lit(ScalarValue::new_interval_mdn(0, 0, lhs as i64 / *v))) } - _ => Err(DataFusionError::Plan(format!( - "found operator '{op}', expected *, /" - ))), + _ => error::query(format!("found operator '{op}', expected *, /")), }, // A timestamp may be added to a duration ScalarValue::TimestampNanosecond(Some(v), _) if matches!(op, BinaryOperator::Add) => { @@ -221,16 +214,16 @@ fn reduce_binary_lhs_duration_df_expr( } // This should not occur, as all the DataFusion literal values created by this process // are handled above. - _ => Err(DataFusionError::Internal(format!( + _ => error::internal(format!( "unexpected DataFusion literal '{rhs}' for duration expression" - ))), + )), }, DFExpr::ScalarFunction { .. } => reduce_binary_scalar_df_expr( &expr_to_interval_df_expr(&lit(ScalarValue::new_interval_mdn(0, 0, lhs as i64)), tz)?, op, rhs, ), - _ => Err(DataFusionError::Plan("invalid duration expression".into())), + _ => error::query("invalid duration expression"), } } @@ -261,7 +254,7 @@ fn reduce_binary_lhs_integer_df_expr( DFExpr::Literal(ScalarValue::Utf8(Some(s))) => { reduce_binary_lhs_duration_df_expr(lhs.into(), op, &parse_timestamp_df_expr(s, tz)?, tz) } - _ => Err(DataFusionError::Plan("invalid integer expression".into())), + _ => error::query("invalid integer expression"), } } @@ -272,17 +265,13 @@ fn reduce_binary_lhs_integer_df_expr( /// ``` fn reduce_binary_lhs_float_df_expr(lhs: f64, op: BinaryOperator, rhs: &DFExpr) -> ExprResult { Ok(lit(match rhs { - DFExpr::Literal(ScalarValue::Float64(Some(rhs))) => { - op.try_reduce(lhs, *rhs).ok_or_else(|| { - DataFusionError::Plan("invalid operator for float expression".to_string()) - })? - } - DFExpr::Literal(ScalarValue::Int64(Some(rhs))) => { - op.try_reduce(lhs, *rhs).ok_or_else(|| { - DataFusionError::Plan("invalid operator for float expression".to_string()) - })? - } - _ => return Err(DataFusionError::Plan("invalid float expression".into())), + DFExpr::Literal(ScalarValue::Float64(Some(rhs))) => op + .try_reduce(lhs, *rhs) + .ok_or_else(|| error::map::query("invalid operator for float expression"))?, + DFExpr::Literal(ScalarValue::Int64(Some(rhs))) => op + .try_reduce(lhs, *rhs) + .ok_or_else(|| error::map::query("invalid operator for float expression"))?, + _ => return error::query("invalid float expression"), })) } @@ -306,9 +295,9 @@ fn reduce_binary_lhs_timestamp_df_expr( DFExpr::Literal(ScalarValue::IntervalMonthDayNano(Some(d))) => match op { BinaryOperator::Add => Ok(lit(ScalarValue::TimestampNanosecond(Some(lhs + *d as i64), None))), BinaryOperator::Sub => Ok(lit(ScalarValue::TimestampNanosecond(Some(lhs - *d as i64), None))), - _ => Err(DataFusionError::Plan( + _ => error::query( format!("invalid operator '{op}' for timestamp and duration: expected +, -"), - )), + ), } DFExpr::Literal(ScalarValue::Int64(_)) // NOTE: This is a slight deviation from InfluxQL, for which the only valid binary @@ -321,9 +310,9 @@ fn reduce_binary_lhs_timestamp_df_expr( &expr_to_interval_df_expr(rhs, tz)?, tz, ), - _ => Err(DataFusionError::Plan( + _ => error::query( format!("invalid expression '{rhs}': expected duration, integer or timestamp string"), - )), + ), } } @@ -335,9 +324,7 @@ fn reduce_binary_scalar_df_expr(lhs: &DFExpr, op: BinaryOperator, rhs: &DFExpr) match op { BinaryOperator::Add => Ok(binary_expr(lhs.clone(), Operator::Plus, rhs.clone())), BinaryOperator::Sub => Ok(binary_expr(lhs.clone(), Operator::Minus, rhs.clone())), - _ => Err(DataFusionError::Plan(format!( - "found operator '{op}', expected +, -" - ))), + _ => error::query(format!("found operator '{op}', expected +, -")), } } @@ -351,11 +338,7 @@ fn expr_to_interval_df_expr(expr: &DFExpr, tz: Option) -> ExprRes DFExpr::Literal(ScalarValue::Int64(Some(v))) => *v, DFExpr::Literal(ScalarValue::TimestampNanosecond(Some(v), _)) => *v, DFExpr::Literal(ScalarValue::Utf8(Some(s))) => parse_timestamp_nanos(s, tz)?, - _ => { - return Err(DataFusionError::Plan(format!( - "unable to cast '{expr}' to duration" - ))) - } + _ => return error::query(format!("unable to cast '{expr}' to duration")), }, ))) } @@ -383,16 +366,16 @@ fn reduce_binary_lhs_string_df_expr( | DFExpr::Literal(ScalarValue::Int64(_)) => { reduce_binary_lhs_timestamp_df_expr(parse_timestamp_nanos(lhs, tz)?, op, rhs, tz) } - _ => Err(DataFusionError::Plan(format!( + _ => error::query(format!( "found '{rhs}', expected duration, integer or timestamp string" - ))), + )), } } fn parse_timestamp_nanos(s: &str, tz: Option) -> Result { parse_timestamp(s, tz) .map(|ts| ts.timestamp_nanos()) - .map_err(|_| DataFusionError::Plan(format!("'{s}' is not a valid timestamp"))) + .map_err(|_| error::map::query(format!("'{s}' is not a valid timestamp"))) } /// Parse s as a timestamp in the specified timezone and return the timestamp diff --git a/iox_query_influxql/src/plan/rewriter.rs b/iox_query_influxql/src/plan/rewriter.rs index ec5daac3eb..1ea72fbc89 100644 --- a/iox_query_influxql/src/plan/rewriter.rs +++ b/iox_query_influxql/src/plan/rewriter.rs @@ -4,7 +4,7 @@ use crate::plan::expr_type_evaluator::evaluate_type; use crate::plan::field::field_name; use crate::plan::field_mapper::{field_and_dimensions, FieldTypeMap, TagSet}; use crate::plan::planner::is_scalar_math_function; -use crate::plan::{util, SchemaProvider}; +use crate::plan::{error, util, SchemaProvider}; use datafusion::common::{DataFusionError, Result}; use influxdb_influxql_parser::common::{MeasurementName, QualifiedMeasurementName}; use influxdb_influxql_parser::expression::walk::{walk_expr, walk_expr_mut}; @@ -128,9 +128,7 @@ fn from_field_and_dimensions( } _ => { // Unreachable, as the from clause should be normalised at this point. - return Err(DataFusionError::Internal( - "Unexpected MeasurementSelection in from".to_string(), - )); + return error::internal("Unexpected MeasurementSelection in from"); } } } @@ -342,8 +340,8 @@ fn rewrite_field_list(s: &dyn SchemaProvider, stmt: &mut SelectStatement) -> Res match args.first() { Some(Expr::Wildcard(Some(WildcardType::Tag))) => { - return Err(DataFusionError::External( - format!("unable to use tag as wildcard in {name}()").into(), + return error::query(format!( + "unable to use tag as wildcard in {name}()" )); } Some(Expr::Wildcard(_)) => { @@ -382,10 +380,9 @@ fn rewrite_field_list(s: &dyn SchemaProvider, stmt: &mut SelectStatement) -> Res .is_break(); if has_wildcard { - return Err(DataFusionError::External( - "unsupported expression: contains a wildcard or regular expression" - .into(), - )); + return error::query( + "unsupported expression: contains a wildcard or regular expression", + ); } new_fields.push(f.clone()); @@ -491,19 +488,19 @@ macro_rules! check_exp_args { ($NAME:expr, $EXP:expr, $ARGS:expr) => { let args_len = $ARGS.len(); if args_len != $EXP { - return Err(DataFusionError::Plan(format!( + return error::query(format!( "invalid number of arguments for {}, expected {}, got {args_len}", $NAME, $EXP - ))); + )); } }; ($NAME:expr, $LO:literal, $HI:literal, $ARGS:expr) => { let args_len = $ARGS.len(); if !($LO..=$HI).contains(&args_len) { - return Err(DataFusionError::Plan(format!( + return error::query(format!( "invalid number of arguments for {}, expected at least {} but no more than {}, got {args_len}", $NAME, $LO, $HI - ))); + )); } }; } @@ -513,12 +510,7 @@ macro_rules! lit_integer { ($NAME:expr, $ARGS:expr, $POS:literal) => { match &$ARGS[$POS] { Expr::Literal(Literal::Integer(v)) => *v, - _ => { - return Err(DataFusionError::Plan(format!( - "expected integer argument in {}()", - $NAME - ))) - } + _ => return error::query(format!("expected integer argument in {}()", $NAME)), } }; @@ -536,12 +528,7 @@ macro_rules! lit_string { ($NAME:expr, $ARGS:expr, $POS:literal) => { match &$ARGS[$POS] { Expr::Literal(Literal::String(s)) => s.as_str(), - _ => { - return Err(DataFusionError::Plan(format!( - "expected string argument in {}()", - $NAME - ))) - } + _ => return error::query(format!("expected string argument in {}()", $NAME)), } }; @@ -606,22 +593,17 @@ impl FieldChecker { // // See: https://github.com/influxdata/influxdb/blob/98361e207349a3643bcc332d54b009818fe7585f/query/compile.go#L1002-L1012 if let Some(fill) = q.fill { - return Err(DataFusionError::Plan(format!( - "{fill} must be used with an aggregate function" - ))); + return error::query(format!("{fill} must be used with an aggregate function")); } if self.has_group_by_time && !self.inherited_group_by_time { - return Err(DataFusionError::Plan( - "GROUP BY requires at least one aggregate function".to_owned(), - )); + return error::query("GROUP BY requires at least one aggregate function"); } } 2.. if self.has_top_bottom => { - return Err(DataFusionError::Plan( - "selector functions top and bottom cannot be combined with other functions" - .to_owned(), - )) + return error::query( + "selector functions top and bottom cannot be combined with other functions", + ) } _ => {} } @@ -630,23 +612,19 @@ impl FieldChecker { // // See: https://github.com/influxdata/influxdb/blob/98361e207349a3643bcc332d54b009818fe7585f/query/compile.go#L1013-L1016 if self.has_distinct && (self.function_count() != 1 || self.has_non_aggregate_fields) { - return Err(DataFusionError::Plan( - "aggregate function distinct() cannot be combined with other functions or fields" - .to_owned(), - )); + return error::query( + "aggregate function distinct() cannot be combined with other functions or fields", + ); } // Validate we are using a selector or raw query if non-aggregate fields are projected. if self.has_non_aggregate_fields { if self.aggregate_count > 0 { - return Err(DataFusionError::Plan( - "mixing aggregate and non-aggregate columns is not supported".to_owned(), - )); + return error::query("mixing aggregate and non-aggregate columns is not supported"); } else if self.selector_count > 1 { - return Err(DataFusionError::Plan( - "mixing multiple selector functions with tags or fields is not supported" - .to_owned(), - )); + return error::query( + "mixing multiple selector functions with tags or fields is not supported", + ); } } @@ -692,9 +670,9 @@ impl FieldChecker { ) } Expr::Binary(b) => match (&*b.lhs, &*b.rhs) { - (Expr::Literal(_), Expr::Literal(_)) => Err(DataFusionError::Plan( - "cannot perform a binary expression on two literals".to_owned(), - )), + (Expr::Literal(_), Expr::Literal(_)) => { + error::query("cannot perform a binary expression on two literals") + } (Expr::Literal(_), other) | (other, Expr::Literal(_)) => self.check_expr(other), (lhs, rhs) => { self.check_expr(lhs)?; @@ -713,9 +691,7 @@ impl FieldChecker { "internal: unexpected regex".into(), )), // See: https://github.com/influxdata/influxdb/blob/98361e207349a3643bcc332d54b009818fe7585f/query/compile.go#L347 - Expr::Literal(_) => Err(DataFusionError::Plan( - "field must contain at least one variable".to_owned(), - )), + Expr::Literal(_) => error::query("field must contain at least one variable"), } } @@ -755,9 +731,9 @@ impl FieldChecker { "percentile" => self.check_percentile(&c.args), "sample" => self.check_sample(&c.args), "distinct" => self.check_distinct(&c.args, false), - "top" | "bottom" if self.has_top_bottom => Err(DataFusionError::Plan(format!( + "top" | "bottom" if self.has_top_bottom => error::query(format!( "selector function {name}() cannot be combined with other functions" - ))), + )), "top" | "bottom" => self.check_top_bottom(name, &c.args), "derivative" | "non_negative_derivative" => self.check_derivative(name, &c.args), "difference" | "non_negative_difference" => self.check_difference(name, &c.args), @@ -807,9 +783,7 @@ impl FieldChecker { } self.check_symbol(name, &c.args[0]) } - _ => Err(DataFusionError::Plan(format!( - "unsupported function {name}()" - ))), + _ => error::query(format!("unsupported function {name}()")), } } @@ -821,10 +795,10 @@ impl FieldChecker { &args[1], Expr::Literal(Literal::Integer(_)) | Expr::Literal(Literal::Float(_)) ) { - return Err(DataFusionError::Plan(format!( + return error::query(format!( "expected number for percentile(), got {:?}", &args[1] - ))); + )); } self.check_symbol("percentile", &args[0]) } @@ -838,9 +812,7 @@ impl FieldChecker { // // See: https://github.com/influxdata/influxdb/blob/98361e207349a3643bcc332d54b009818fe7585f/query/compile.go#L441-L443 if v <= 1 { - return Err(DataFusionError::Plan(format!( - "sample window must be greater than 1, got {v}" - ))); + return error::query(format!("sample window must be greater than 1, got {v}")); } self.check_symbol("sample", &args[0]) @@ -852,9 +824,7 @@ impl FieldChecker { check_exp_args!("distinct", 1, args); if !matches!(&args[0], Expr::VarRef(_)) { - return Err(DataFusionError::Plan( - "expected field argument in distinct()".to_owned(), - )); + return error::query("expected field argument in distinct()"); } if !nested { @@ -871,10 +841,10 @@ impl FieldChecker { self.has_top_bottom = true; if args.len() < 2 { - return Err(DataFusionError::Plan(format!( + return error::query(format!( "invalid number of arguments for {name}, expected at least 2, got {}", args.len() - ))); + )); } let (last, args) = args.split_last().expect("length >= 2"); @@ -882,31 +852,29 @@ impl FieldChecker { match last { Expr::Literal(Literal::Integer(limit)) => { if *limit <= 0 { - return Err(DataFusionError::Plan(format!( + return error::query(format!( "limit ({limit}) for {name} must be greater than 0" - ))); + )); } } got => { - return Err(DataFusionError::Plan(format!( + return error::query(format!( "expected integer as last argument for {name}, got {got:?}" - ))) + )) } } let (first, rest) = args.split_first().expect("length >= 1"); if !matches!(first, Expr::VarRef(_)) { - return Err(DataFusionError::Plan(format!( - "expected first argument to be a field for {name}" - ))); + return error::query(format!("expected first argument to be a field for {name}")); } for expr in rest { if !matches!(expr, Expr::VarRef(_)) { - return Err(DataFusionError::Plan(format!( + return error::query(format!( "only fields or tags are allow for {name}(), got {expr:?}" - ))); + )); } } @@ -924,15 +892,13 @@ impl FieldChecker { check_exp_args!(name, 1, 2, args); match args.get(1) { Some(Expr::Literal(Literal::Duration(d))) if **d <= 0 => { - return Err(DataFusionError::Plan(format!( - "duration argument must be positive, got {d}" - ))) + return error::query(format!("duration argument must be positive, got {d}")) } None | Some(Expr::Literal(Literal::Duration(_))) => {} Some(got) => { - return Err(DataFusionError::Plan(format!( + return error::query(format!( "second argument to {name} must be a duration, got {got:?}" - ))) + )) } } @@ -945,15 +911,13 @@ impl FieldChecker { match args.get(1) { Some(Expr::Literal(Literal::Duration(d))) if **d <= 0 => { - return Err(DataFusionError::Plan(format!( - "duration argument must be positive, got {d}" - ))) + return error::query(format!("duration argument must be positive, got {d}")) } None | Some(Expr::Literal(Literal::Duration(_))) => {} Some(got) => { - return Err(DataFusionError::Plan(format!( + return error::query(format!( "second argument to {name} must be a duration, got {got:?}" - ))) + )) } } @@ -980,9 +944,9 @@ impl FieldChecker { let v = lit_integer!("moving_average", args, 1); if v <= 1 { - return Err(DataFusionError::Plan(format!( + return error::query(format!( "moving_average window must be greater than 1, got {v}" - ))); + )); } self.check_nested_symbol("moving_average", &args[0]) @@ -994,22 +958,20 @@ impl FieldChecker { let v = lit_integer!(name, args, 1); if v < 1 { - return Err(DataFusionError::Plan(format!( - "{name} period must be greater than 1, got {v}" - ))); + return error::query(format!("{name} period must be greater than 1, got {v}")); } if let Some(v) = lit_integer!(name, args, 2?) { match (v, name) { (v, "triple_exponential_derivative") if v < 1 && v != -1 => { - return Err(DataFusionError::Plan(format!( + return error::query(format!( "{name} hold period must be greater than or equal to 1" - ))) + )) } (v, _) if v < 0 && v != -1 => { - return Err(DataFusionError::Plan(format!( + return error::query(format!( "{name} hold period must be greater than or equal to 0" - ))) + )) } _ => {} } @@ -1018,9 +980,9 @@ impl FieldChecker { match lit_string!(name, args, 3?) { Some("exponential" | "simple") => {} Some(warmup) => { - return Err(DataFusionError::Plan(format!( + return error::query(format!( "{name} warmup type must be one of: 'exponential', 'simple', got {warmup}" - ))) + )) } None => {} } @@ -1034,16 +996,14 @@ impl FieldChecker { let v = lit_integer!(name, args, 1); if v < 1 { - return Err(DataFusionError::Plan(format!( - "{name} period must be greater than 1, got {v}" - ))); + return error::query(format!("{name} period must be greater than 1, got {v}")); } if let Some(v) = lit_integer!(name, args, 2?) { if v < 0 && v != -1 { - return Err(DataFusionError::Plan(format!( + return error::query(format!( "{name} hold period must be greater than or equal to 0" - ))); + )); } } @@ -1056,25 +1016,23 @@ impl FieldChecker { let v = lit_integer!(name, args, 1); if v < 1 { - return Err(DataFusionError::Plan(format!( - "{name} period must be greater than 1, got {v}" - ))); + return error::query(format!("{name} period must be greater than 1, got {v}")); } if let Some(v) = lit_integer!(name, args, 2?) { if v < 0 && v != -1 { - return Err(DataFusionError::Plan(format!( + return error::query(format!( "{name} hold period must be greater than or equal to 0" - ))); + )); } } match lit_string!(name, args, 3?) { Some("none" | "exponential" | "simple") => {} Some(warmup) => { - return Err(DataFusionError::Plan(format!( + return error::query(format!( "{name} warmup type must be one of: 'none', 'exponential' or 'simple', got {warmup}" - ))) + )) } None => {} } @@ -1088,15 +1046,13 @@ impl FieldChecker { match args.get(1) { Some(Expr::Literal(Literal::Duration(d))) if **d <= 0 => { - return Err(DataFusionError::Plan(format!( - "duration argument must be positive, got {d}" - ))) + return error::query(format!("duration argument must be positive, got {d}")) } None | Some(Expr::Literal(Literal::Duration(_))) => {} Some(got) => { - return Err(DataFusionError::Plan(format!( + return error::query(format!( "second argument to {name} must be a duration, got {got:?}" - ))) + )) } } @@ -1120,26 +1076,20 @@ impl FieldChecker { let v = lit_integer!(name, args, 1); if v < 1 { - return Err(DataFusionError::Plan(format!( - "{name} N argument must be greater than 0, got {v}" - ))); + return error::query(format!("{name} N argument must be greater than 0, got {v}")); } let v = lit_integer!(name, args, 2); if v < 0 { - return Err(DataFusionError::Plan(format!( - "{name} S argument cannot be negative, got {v}" - ))); + return error::query(format!("{name} S argument cannot be negative, got {v}")); } match &args[0] { - Expr::Call(_) if !self.has_group_by_time => Err(DataFusionError::Plan(format!( - "{name} aggregate requires a GROUP BY interval" - ))), + Expr::Call(_) if !self.has_group_by_time => { + error::query(format!("{name} aggregate requires a GROUP BY interval")) + } expr @ Expr::Call(_) => self.check_nested_expr(expr), - _ => Err(DataFusionError::Plan(format!( - "must use aggregate function with {name}" - ))), + _ => error::query(format!("must use aggregate function with {name}")), } } @@ -1168,15 +1118,13 @@ impl FieldChecker { fn check_nested_symbol(&mut self, name: &str, expr: &Expr) -> Result<()> { match expr { - Expr::Call(_) if !self.has_group_by_time => Err(DataFusionError::Plan(format!( - "{name} aggregate requires a GROUP BY interval" - ))), - Expr::Call(_) => self.check_nested_expr(expr), - _ if self.has_group_by_time && !self.inherited_group_by_time => { - Err(DataFusionError::Plan(format!( - "aggregate function required inside the call to {name}" - ))) + Expr::Call(_) if !self.has_group_by_time => { + error::query(format!("{name} aggregate requires a GROUP BY interval")) } + Expr::Call(_) => self.check_nested_expr(expr), + _ if self.has_group_by_time && !self.inherited_group_by_time => error::query(format!( + "aggregate function required inside the call to {name}" + )), _ => self.check_symbol(name, expr), } } @@ -1189,9 +1137,7 @@ impl FieldChecker { Expr::Wildcard(_) | Expr::Literal(Literal::Regex(_)) => Err(DataFusionError::External( "internal: unexpected wildcard or regex".into(), )), - expr => Err(DataFusionError::Plan(format!( - "expected field argument in {name}(), got {expr:?}" - ))), + expr => error::query(format!("expected field argument in {name}(), got {expr:?}")), } } } @@ -1825,14 +1771,14 @@ mod test { let err = rewrite_statement(&namespace, &stmt).unwrap_err(); assert_eq!( err.to_string(), - "External error: unsupported expression: contains a wildcard or regular expression" + "Error during planning: unsupported expression: contains a wildcard or regular expression" ); let stmt = parse_select("SELECT COUNT(*::tag) FROM cpu"); let err = rewrite_statement(&namespace, &stmt).unwrap_err(); assert_eq!( err.to_string(), - "External error: unable to use tag as wildcard in count()" + "Error during planning: unable to use tag as wildcard in count()" ); } diff --git a/iox_query_influxql/src/plan/test_utils.rs b/iox_query_influxql/src/plan/test_utils.rs index 17fdf416ac..54cc2b9887 100644 --- a/iox_query_influxql/src/plan/test_utils.rs +++ b/iox_query_influxql/src/plan/test_utils.rs @@ -1,8 +1,8 @@ //! APIs for testing. #![cfg(test)] -use crate::plan::SchemaProvider; -use datafusion::common::{DataFusionError, Result as DataFusionResult}; +use crate::plan::{error, SchemaProvider}; +use datafusion::common::Result as DataFusionResult; use datafusion::datasource::empty::EmptyTable; use datafusion::datasource::provider_as_source; use datafusion::logical_expr::{AggregateUDF, ScalarUDF, TableSource}; @@ -156,7 +156,7 @@ impl SchemaProvider for MockSchemaProvider { self.tables .get(name) .map(|(t, _)| Arc::clone(t)) - .ok_or_else(|| DataFusionError::Plan(format!("measurement does not exist: {name}"))) + .ok_or_else(|| error::map::query(format!("measurement does not exist: {name}"))) } fn get_function_meta(&self, _name: &str) -> Option> { diff --git a/iox_query_influxql/src/plan/timestamp.rs b/iox_query_influxql/src/plan/timestamp.rs index 0776413ba8..35b5e65f9b 100644 --- a/iox_query_influxql/src/plan/timestamp.rs +++ b/iox_query_influxql/src/plan/timestamp.rs @@ -1,5 +1,6 @@ +use crate::plan::error; use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Offset, TimeZone}; -use datafusion::common::{DataFusionError, Result}; +use datafusion::common::Result; /// Parse the timestamp string and return a DateTime in UTC. fn parse_timestamp_utc(s: &str) -> Result> { @@ -19,7 +20,7 @@ fn parse_timestamp_utc(s: &str) -> Result> { .map(|nd| nd.and_time(NaiveTime::default())), ) .map(|ts| DateTime::from_utc(ts, chrono::Utc.fix())) - .map_err(|_| DataFusionError::Plan("invalid timestamp string".into())) + .map_err(|_| error::map::query("invalid timestamp string")) } /// Parse the timestamp string and return a DateTime in the specified timezone. @@ -50,7 +51,7 @@ fn parse_timestamp_tz(s: &str, tz: chrono_tz::Tz) -> Result Ope pub(in crate::plan) fn schema_from_df(schema: &DFSchema) -> Result { let s: Arc = Arc::new(schema.into()); s.try_into().map_err(|err| { - DataFusionError::Internal(format!( + error::map::internal(format!( "unable to convert DataFusion schema to IOx schema: {err}" )) }) @@ -53,9 +51,8 @@ impl Schemas { /// Sanitize an InfluxQL regular expression and create a compiled [`regex::Regex`]. pub(crate) fn parse_regex(re: &Regex) -> Result { let pattern = clean_non_meta_escapes(re.as_str()); - regex::Regex::new(&pattern).map_err(|e| { - DataFusionError::External(format!("invalid regular expression '{re}': {e}").into()) - }) + regex::Regex::new(&pattern) + .map_err(|e| error::map::query(format!("invalid regular expression '{re}': {e}"))) } /// Returns `n` as a literal expression of the specified `data_type`. @@ -69,9 +66,7 @@ fn number_to_expr(n: &Number, data_type: DataType) -> Result { (Number::Float(v), DataType::UInt64) => lit(*v as u64), (n, data_type) => { // The only output data types expected are Int64, Float64 or UInt64 - return Err(DataFusionError::Internal(format!( - "no conversion from {n} to {data_type}" - ))); + return error::internal(format!("no conversion from {n} to {data_type}")); } }) } @@ -119,51 +114,3 @@ pub(crate) fn rebase_expr( }) } } - -/// Returns `true` if `expr` is an [`Expr::AggregateUDF`] for one of -/// the selector functions. -#[allow(unused)] -pub(crate) fn is_selector_aggregate_udf(expr: &Expr) -> bool { - static FUNCTIONS: Lazy> = Lazy::new(|| { - vec![ - "selector_first", - "selector_last", - "selector_max", - "selector_min", - ] - }); - - matches!(expr, Expr::AggregateUDF { fun, ..} if FUNCTIONS.contains(&fun.name.as_str())) -} - -/// Collect all the references to selector functions, such as `selector_last`. -/// They are returned in order of occurrence (depth first), with duplicates omitted. -#[allow(unused)] -pub(crate) fn find_aggregate_selector_exprs(exprs: &[Expr]) -> Vec { - exprs - .iter() - .flat_map(|expr| { - // Contains a list of unique selector UDAFs - let mut exprs = vec![]; - - expr.apply(&mut |expr| { - if is_selector_aggregate_udf(expr) { - if !exprs.contains(expr) { - exprs.push(expr.clone()); - } - Ok(VisitRecursion::Skip) - } else { - Ok(VisitRecursion::Continue) - } - }) - .unwrap(); - - exprs - }) - .fold(vec![], |mut acc, expr| { - if !acc.contains(&expr) { - acc.push(expr) - } - acc - }) -} diff --git a/iox_query_influxql/src/plan/util_copy.rs b/iox_query_influxql/src/plan/util_copy.rs index d420ca7caa..2ab65be451 100644 --- a/iox_query_influxql/src/plan/util_copy.rs +++ b/iox_query_influxql/src/plan/util_copy.rs @@ -7,7 +7,7 @@ //! If these APIs are stabilised and made public, they can be removed from IOx. //! //! NOTE -use datafusion::common::{DataFusionError, Result}; +use datafusion::common::Result; use datafusion::logical_expr::{ expr::{ AggregateFunction, Between, BinaryExpr, Case, Cast, Expr, GetIndexedField, GroupingSet,