chore: update all remaining code to use `error` and `error::map` module

pull/24376/head
Stuart Carnie 2023-04-16 08:00:12 +10:00
parent 69d75745cc
commit 8274d584f5
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
9 changed files with 173 additions and 323 deletions

View File

@ -1,7 +1,7 @@
use crate::plan::field::field_by_name;
use crate::plan::field_mapper::map_type;
use crate::plan::SchemaProvider;
use datafusion::common::{DataFusionError, Result};
use crate::plan::{error, SchemaProvider};
use datafusion::common::Result;
use influxdb_influxql_parser::common::{MeasurementName, QualifiedMeasurementName};
use influxdb_influxql_parser::expression::{Call, Expr, VarRef, VarRefDataType};
use influxdb_influxql_parser::literal::Literal;
@ -113,9 +113,7 @@ impl<'a> TypeEvaluator<'a> {
}
}
_ => {
return Err(DataFusionError::Internal(
"eval_var_ref: Unexpected MeasurementSelection".to_string(),
))
return error::internal("eval_var_ref: Unexpected MeasurementSelection")
}
}
}

View File

@ -1,5 +1,3 @@
#![allow(dead_code)]
use crate::plan::var_ref::field_type_to_var_ref_data_type;
use crate::plan::SchemaProvider;
use datafusion::common::Result;

View File

@ -20,7 +20,7 @@ use arrow::record_batch::RecordBatch;
use chrono_tz::Tz;
use datafusion::catalog::TableReference;
use datafusion::common::tree_node::{TreeNode, TreeNodeRewriter};
use datafusion::common::{DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, ToDFSchema};
use datafusion::common::{DFSchema, DFSchemaRef, Result, ScalarValue, ToDFSchema};
use datafusion::datasource::{provider_as_source, MemTable};
use datafusion::logical_expr::expr_rewriter::normalize_col;
use datafusion::logical_expr::logical_plan::builder::project;
@ -214,9 +214,8 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
// grouped into tables in the output when formatted as InfluxQL tabular format.
let measurement_column_index = schema
.index_of_column_by_name(None, "plan_type")?
.ok_or_else(|| {
DataFusionError::External("internal: unable to find plan_type column".into())
})? as u32;
.ok_or_else(|| error::map::internal("unable to find plan_type column"))?
as u32;
let (analyze, verbose) = match explain.options {
Some(ExplainOption::AnalyzeVerbose) => (true, true),
@ -767,11 +766,11 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
let limit = limit
.map(|v| <u64 as TryInto<i64>>::try_into(*v))
.transpose()
.map_err(|_| DataFusionError::Plan("limit out of range".to_owned()))?;
.map_err(|_| error::map::query("limit out of range"))?;
let offset = offset
.map(|v| <u64 as TryInto<i64>>::try_into(*v))
.transpose()
.map_err(|_| DataFusionError::Plan("offset out of range".to_owned()))?;
.map_err(|_| error::map::query("offset out of range".to_owned()))?;
// a reference to the ROW_NUMBER column.
let row_alias = IOX_ROW_ALIAS.as_expr();
@ -835,7 +834,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
return Ok(input);
}
Err(DataFusionError::NotImplemented("SLIMIT or SOFFSET".into()))
error::not_implemented("SLIMIT or SOFFSET")
}
/// Map the InfluxQL `SELECT` projection list into a list of DataFusion expressions.
@ -941,9 +940,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
let iox_schema = &schemas.iox_schema;
match iql {
// rewriter is expected to expand wildcard expressions
IQLExpr::Wildcard(_) => Err(DataFusionError::Internal(
"unexpected wildcard in projection".into(),
)),
IQLExpr::Wildcard(_) => error::internal("unexpected wildcard in projection"),
IQLExpr::VarRef(VarRef {
name,
data_type: opt_dst_type,
@ -989,7 +986,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
},
})
}
IQLExpr::BindParameter(_) => Err(DataFusionError::NotImplemented("parameter".into())),
IQLExpr::BindParameter(_) => error::not_implemented("parameter"),
IQLExpr::Literal(val) => match val {
Literal::Integer(v) => Ok(lit(*v)),
Literal::Unsigned(v) => Ok(lit(*v)),
@ -1000,19 +997,17 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
Some(v.timestamp()),
None,
))),
Literal::Duration(_) => {
Err(DataFusionError::NotImplemented("duration literal".into()))
}
Literal::Duration(_) => error::not_implemented("duration literal"),
Literal::Regex(re) => match ctx.scope {
// a regular expression in a projection list is unexpected,
// as it should have been expanded by the rewriter.
ExprScope::Projection => Err(DataFusionError::Internal(
"unexpected regular expression found in projection".into(),
)),
ExprScope::Projection => {
error::internal("unexpected regular expression found in projection")
}
ExprScope::Where => Ok(lit(clean_non_meta_escapes(re.as_str()))),
},
},
IQLExpr::Distinct(_) => Err(DataFusionError::NotImplemented("DISTINCT".into())),
IQLExpr::Distinct(_) => error::not_implemented("DISTINCT"),
IQLExpr::Call(call) => self.call_to_df_expr(ctx, call, schemas),
IQLExpr::Binary(expr) => self.arithmetic_expr_to_df_expr(ctx, expr, schemas),
IQLExpr::Nested(e) => self.expr_to_df_expr(ctx, e, schemas),
@ -1043,12 +1038,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
match ctx.scope {
ExprScope::Where => {
if call.name.eq_ignore_ascii_case("now") {
Err(DataFusionError::NotImplemented("now".into()))
error::not_implemented("now")
} else {
let name = &call.name;
Err(DataFusionError::External(
format!("invalid function call in condition: {name}").into(),
))
error::query(format!("invalid function call in condition: {name}"))
}
}
ExprScope::Projection => self.function_to_df_expr(ctx, call, schemas),
@ -1064,9 +1057,9 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
fn check_arg_count(name: &str, args: &[IQLExpr], count: usize) -> Result<()> {
let got = args.len();
if got != count {
Err(DataFusionError::Plan(format!(
error::query(format!(
"invalid number of arguments for {name}: expected {count}, got {got}"
)))
))
} else {
Ok(())
}
@ -1164,9 +1157,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
match BuiltinScalarFunction::from_str(call.name.as_str())? {
BuiltinScalarFunction::Log => {
if args.len() != 2 {
Err(DataFusionError::Plan(
"invalid number of arguments for log, expected 2, got 1".to_owned(),
))
error::query("invalid number of arguments for log, expected 2, got 1")
} else {
Ok(Expr::ScalarFunction {
fun: BuiltinScalarFunction::Log,
@ -1233,13 +1224,13 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
self.create_table_ref(normalize_identifier(ident))
}
// rewriter is expected to expand the regular expression
MeasurementName::Regex(_) => Err(DataFusionError::Internal(
"unexpected regular expression in FROM clause".into(),
)),
MeasurementName::Regex(_) => error::internal(
"unexpected regular expression in FROM clause",
),
},
MeasurementSelection::Subquery(_) => Err(DataFusionError::NotImplemented(
"subquery in FROM clause".into(),
)),
MeasurementSelection::Subquery(_) => error::not_implemented(
"subquery in FROM clause",
),
}? else { continue };
table_projs.push_back(table_proj);
}
@ -1280,14 +1271,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
let mut out = HashSet::new();
for qualified_name in from.iter() {
if qualified_name.database.is_some() {
return Err(DataFusionError::NotImplemented(
"database name in from clause".into(),
));
return error::not_implemented("database name in from clause");
}
if qualified_name.retention_policy.is_some() {
return Err(DataFusionError::NotImplemented(
"retention policy in from clause".into(),
));
return error::not_implemented("retention policy in from clause");
}
match &qualified_name.name {
MeasurementName::Name(name) => {
@ -1320,9 +1307,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
) -> Result<LogicalPlan> {
if show_field_keys.database.is_some() {
// How do we handle this? Do we need to perform cross-namespace queries here?
return Err(DataFusionError::NotImplemented(
"SHOW FIELD KEYS ON <database>".into(),
));
return error::not_implemented("SHOW FIELD KEYS ON <database>");
}
let field_key_col = "fieldKey";
@ -1414,9 +1399,7 @@ fn build_gap_fill_node(
// added by the planner.
let (stride, time_range, origin) = if date_bin_args.len() == 3 {
let time_col = date_bin_args[1].try_into_col().map_err(|_| {
DataFusionError::Internal(
"DATE_BIN requires a column as the source argument".to_string(),
)
error::map::internal("DATE_BIN requires a column as the source argument")
})?;
// Ensure that a time range was specified and is valid for gap filling
@ -1443,10 +1426,10 @@ fn build_gap_fill_node(
} else {
// This is an internal error as the date_bin function is added by the planner and should
// always contain the correct number of arguments.
return Err(DataFusionError::Internal(format!(
return error::internal(format!(
"DATE_BIN expects 3 arguments, got {}",
date_bin_args.len()
)));
));
};
let aggr = Aggregate::try_from_plan(&input)?;
@ -1487,7 +1470,7 @@ fn build_gap_fill_node(
fn plan_with_metadata(plan: LogicalPlan, metadata: &InfluxQlMetadata) -> Result<LogicalPlan> {
fn make_schema(schema: DFSchemaRef, metadata: &InfluxQlMetadata) -> Result<DFSchemaRef> {
let data = serde_json::to_string(metadata).map_err(|err| {
DataFusionError::Internal(format!("error serializing InfluxQL metadata: {err}"))
error::map::internal(format!("error serializing InfluxQL metadata: {err}"))
})?;
let mut md = schema.metadata().clone();
@ -1592,11 +1575,7 @@ fn plan_with_metadata(plan: LogicalPlan, metadata: &InfluxQlMetadata) -> Result<
t.projected_schema = make_schema(Arc::clone(&src.projected_schema), metadata)?;
LogicalPlan::TableScan(t)
}
_ => {
return Err(DataFusionError::External(
format!("unexpected LogicalPlan: {}", input.display()).into(),
))
}
_ => return error::internal(format!("unexpected LogicalPlan: {}", input.display())),
})
}
@ -1705,9 +1684,7 @@ fn conditional_op_to_operator(op: ConditionalOperator) -> Result<Operator> {
ConditionalOperator::And => Ok(Operator::And),
ConditionalOperator::Or => Ok(Operator::Or),
// NOTE: This is not supported by InfluxQL SELECT expressions, so it is unexpected
ConditionalOperator::In => Err(DataFusionError::Internal(
"unexpected binary operator: IN".into(),
)),
ConditionalOperator::In => error::internal("unexpected binary operator: IN"),
}
}
@ -1815,7 +1792,7 @@ fn is_time_field(cond: &ConditionalExpression) -> bool {
fn find_expr(cond: &ConditionalExpression) -> Result<&IQLExpr> {
cond.expr()
.ok_or_else(|| DataFusionError::Internal("incomplete conditional expression".into()))
.ok_or_else(|| error::map::internal("incomplete conditional expression"))
}
#[cfg(test)]

View File

@ -1,8 +1,9 @@
use crate::plan::error;
use crate::plan::timestamp::parse_timestamp;
use crate::plan::util::binary_operator_to_df_operator;
use datafusion::common::{DataFusionError, Result, ScalarValue};
use datafusion::logical_expr::{binary_expr, lit, now, BinaryExpr, Expr as DFExpr, Operator};
use influxdb_influxql_parser::expression::{Binary, BinaryOperator};
use influxdb_influxql_parser::expression::{Binary, BinaryOperator, Call};
use influxdb_influxql_parser::{expression::Expr, literal::Literal};
type ExprResult = Result<DFExpr>;
@ -62,11 +63,7 @@ pub(in crate::plan) fn time_range_to_df_expr(expr: &Expr, tz: Option<chrono_tz::
DFExpr::Literal(ScalarValue::Int64(Some(v))) => {
DFExpr::Literal(ScalarValue::TimestampNanosecond(Some(v), None))
}
_ => {
return Err(DataFusionError::Plan(
"invalid time range expression".into(),
))
}
_ => return error::query("invalid time range expression"),
})
}
@ -85,13 +82,13 @@ pub(super) fn duration_expr_to_nanoseconds(expr: &Expr) -> Result<i64> {
DFExpr::Literal(ScalarValue::IntervalMonthDayNano(Some(v))) => Ok(v as i64),
DFExpr::Literal(ScalarValue::Float64(Some(v))) => Ok(v as i64),
DFExpr::Literal(ScalarValue::Int64(Some(v))) => Ok(v),
_ => Err(DataFusionError::Plan("invalid duration expression".into())),
_ => error::query("invalid duration expression"),
}
}
fn map_expr_err(expr: &Expr) -> impl Fn(DataFusionError) -> DataFusionError + '_ {
move |err| {
DataFusionError::Plan(format!(
error::map::query(format!(
"invalid expression \"{}\": {}",
expr,
match err {
@ -105,11 +102,11 @@ fn map_expr_err(expr: &Expr) -> impl Fn(DataFusionError) -> DataFusionError + '_
fn reduce_expr(expr: &Expr, tz: Option<chrono_tz::Tz>) -> ExprResult {
match expr {
Expr::Binary(v) => reduce_binary_expr(v, tz).map_err(map_expr_err(expr)),
Expr::Call (v) => {
if !v.name.eq_ignore_ascii_case("now") {
return Err(DataFusionError::Plan(
format!("invalid function call '{}'", v.name),
));
Expr::Call (Call { name, .. }) => {
if !name.eq_ignore_ascii_case("now") {
return error::query(
format!("invalid function call '{name}'"),
);
}
Ok(now())
}
@ -123,14 +120,14 @@ fn reduce_expr(expr: &Expr, tz: Option<chrono_tz::Tz>) -> ExprResult {
None,
))),
Literal::Duration(v) => Ok(lit(ScalarValue::new_interval_mdn(0, 0, **v))),
_ => Err(DataFusionError::Plan(format!(
_ => error::query(format!(
"found literal '{val}', expected duration, float, integer, or timestamp string"
))),
)),
},
Expr::VarRef { .. } | Expr::BindParameter(_) | Expr::Wildcard(_) | Expr::Distinct(_) => Err(DataFusionError::Plan(format!(
Expr::VarRef { .. } | Expr::BindParameter(_) | Expr::Wildcard(_) | Expr::Distinct(_) => error::query(format!(
"found symbol '{expr}', expected now() or a literal duration, float, integer and timestamp string"
))),
)),
}
}
@ -190,9 +187,7 @@ fn reduce_binary_lhs_duration_df_expr(
BinaryOperator::Sub => {
Ok(lit(ScalarValue::new_interval_mdn(0, 0, (lhs - *d) as i64)))
}
_ => Err(DataFusionError::Plan(format!(
"found operator '{op}', expected +, -"
))),
_ => error::query(format!("found operator '{op}', expected +, -")),
},
// durations may only be scaled by float literals
ScalarValue::Float64(Some(v)) => {
@ -205,9 +200,7 @@ fn reduce_binary_lhs_duration_df_expr(
BinaryOperator::Div => {
Ok(lit(ScalarValue::new_interval_mdn(0, 0, lhs as i64 / *v)))
}
_ => Err(DataFusionError::Plan(format!(
"found operator '{op}', expected *, /"
))),
_ => error::query(format!("found operator '{op}', expected *, /")),
},
// A timestamp may be added to a duration
ScalarValue::TimestampNanosecond(Some(v), _) if matches!(op, BinaryOperator::Add) => {
@ -221,16 +214,16 @@ fn reduce_binary_lhs_duration_df_expr(
}
// This should not occur, as all the DataFusion literal values created by this process
// are handled above.
_ => Err(DataFusionError::Internal(format!(
_ => error::internal(format!(
"unexpected DataFusion literal '{rhs}' for duration expression"
))),
)),
},
DFExpr::ScalarFunction { .. } => reduce_binary_scalar_df_expr(
&expr_to_interval_df_expr(&lit(ScalarValue::new_interval_mdn(0, 0, lhs as i64)), tz)?,
op,
rhs,
),
_ => Err(DataFusionError::Plan("invalid duration expression".into())),
_ => error::query("invalid duration expression"),
}
}
@ -261,7 +254,7 @@ fn reduce_binary_lhs_integer_df_expr(
DFExpr::Literal(ScalarValue::Utf8(Some(s))) => {
reduce_binary_lhs_duration_df_expr(lhs.into(), op, &parse_timestamp_df_expr(s, tz)?, tz)
}
_ => Err(DataFusionError::Plan("invalid integer expression".into())),
_ => error::query("invalid integer expression"),
}
}
@ -272,17 +265,13 @@ fn reduce_binary_lhs_integer_df_expr(
/// ```
fn reduce_binary_lhs_float_df_expr(lhs: f64, op: BinaryOperator, rhs: &DFExpr) -> ExprResult {
Ok(lit(match rhs {
DFExpr::Literal(ScalarValue::Float64(Some(rhs))) => {
op.try_reduce(lhs, *rhs).ok_or_else(|| {
DataFusionError::Plan("invalid operator for float expression".to_string())
})?
}
DFExpr::Literal(ScalarValue::Int64(Some(rhs))) => {
op.try_reduce(lhs, *rhs).ok_or_else(|| {
DataFusionError::Plan("invalid operator for float expression".to_string())
})?
}
_ => return Err(DataFusionError::Plan("invalid float expression".into())),
DFExpr::Literal(ScalarValue::Float64(Some(rhs))) => op
.try_reduce(lhs, *rhs)
.ok_or_else(|| error::map::query("invalid operator for float expression"))?,
DFExpr::Literal(ScalarValue::Int64(Some(rhs))) => op
.try_reduce(lhs, *rhs)
.ok_or_else(|| error::map::query("invalid operator for float expression"))?,
_ => return error::query("invalid float expression"),
}))
}
@ -306,9 +295,9 @@ fn reduce_binary_lhs_timestamp_df_expr(
DFExpr::Literal(ScalarValue::IntervalMonthDayNano(Some(d))) => match op {
BinaryOperator::Add => Ok(lit(ScalarValue::TimestampNanosecond(Some(lhs + *d as i64), None))),
BinaryOperator::Sub => Ok(lit(ScalarValue::TimestampNanosecond(Some(lhs - *d as i64), None))),
_ => Err(DataFusionError::Plan(
_ => error::query(
format!("invalid operator '{op}' for timestamp and duration: expected +, -"),
)),
),
}
DFExpr::Literal(ScalarValue::Int64(_))
// NOTE: This is a slight deviation from InfluxQL, for which the only valid binary
@ -321,9 +310,9 @@ fn reduce_binary_lhs_timestamp_df_expr(
&expr_to_interval_df_expr(rhs, tz)?,
tz,
),
_ => Err(DataFusionError::Plan(
_ => error::query(
format!("invalid expression '{rhs}': expected duration, integer or timestamp string"),
)),
),
}
}
@ -335,9 +324,7 @@ fn reduce_binary_scalar_df_expr(lhs: &DFExpr, op: BinaryOperator, rhs: &DFExpr)
match op {
BinaryOperator::Add => Ok(binary_expr(lhs.clone(), Operator::Plus, rhs.clone())),
BinaryOperator::Sub => Ok(binary_expr(lhs.clone(), Operator::Minus, rhs.clone())),
_ => Err(DataFusionError::Plan(format!(
"found operator '{op}', expected +, -"
))),
_ => error::query(format!("found operator '{op}', expected +, -")),
}
}
@ -351,11 +338,7 @@ fn expr_to_interval_df_expr(expr: &DFExpr, tz: Option<chrono_tz::Tz>) -> ExprRes
DFExpr::Literal(ScalarValue::Int64(Some(v))) => *v,
DFExpr::Literal(ScalarValue::TimestampNanosecond(Some(v), _)) => *v,
DFExpr::Literal(ScalarValue::Utf8(Some(s))) => parse_timestamp_nanos(s, tz)?,
_ => {
return Err(DataFusionError::Plan(format!(
"unable to cast '{expr}' to duration"
)))
}
_ => return error::query(format!("unable to cast '{expr}' to duration")),
},
)))
}
@ -383,16 +366,16 @@ fn reduce_binary_lhs_string_df_expr(
| DFExpr::Literal(ScalarValue::Int64(_)) => {
reduce_binary_lhs_timestamp_df_expr(parse_timestamp_nanos(lhs, tz)?, op, rhs, tz)
}
_ => Err(DataFusionError::Plan(format!(
_ => error::query(format!(
"found '{rhs}', expected duration, integer or timestamp string"
))),
)),
}
}
fn parse_timestamp_nanos(s: &str, tz: Option<chrono_tz::Tz>) -> Result<i64> {
parse_timestamp(s, tz)
.map(|ts| ts.timestamp_nanos())
.map_err(|_| DataFusionError::Plan(format!("'{s}' is not a valid timestamp")))
.map_err(|_| error::map::query(format!("'{s}' is not a valid timestamp")))
}
/// Parse s as a timestamp in the specified timezone and return the timestamp

View File

@ -4,7 +4,7 @@ use crate::plan::expr_type_evaluator::evaluate_type;
use crate::plan::field::field_name;
use crate::plan::field_mapper::{field_and_dimensions, FieldTypeMap, TagSet};
use crate::plan::planner::is_scalar_math_function;
use crate::plan::{util, SchemaProvider};
use crate::plan::{error, util, SchemaProvider};
use datafusion::common::{DataFusionError, Result};
use influxdb_influxql_parser::common::{MeasurementName, QualifiedMeasurementName};
use influxdb_influxql_parser::expression::walk::{walk_expr, walk_expr_mut};
@ -128,9 +128,7 @@ fn from_field_and_dimensions(
}
_ => {
// Unreachable, as the from clause should be normalised at this point.
return Err(DataFusionError::Internal(
"Unexpected MeasurementSelection in from".to_string(),
));
return error::internal("Unexpected MeasurementSelection in from");
}
}
}
@ -342,8 +340,8 @@ fn rewrite_field_list(s: &dyn SchemaProvider, stmt: &mut SelectStatement) -> Res
match args.first() {
Some(Expr::Wildcard(Some(WildcardType::Tag))) => {
return Err(DataFusionError::External(
format!("unable to use tag as wildcard in {name}()").into(),
return error::query(format!(
"unable to use tag as wildcard in {name}()"
));
}
Some(Expr::Wildcard(_)) => {
@ -382,10 +380,9 @@ fn rewrite_field_list(s: &dyn SchemaProvider, stmt: &mut SelectStatement) -> Res
.is_break();
if has_wildcard {
return Err(DataFusionError::External(
"unsupported expression: contains a wildcard or regular expression"
.into(),
));
return error::query(
"unsupported expression: contains a wildcard or regular expression",
);
}
new_fields.push(f.clone());
@ -491,19 +488,19 @@ macro_rules! check_exp_args {
($NAME:expr, $EXP:expr, $ARGS:expr) => {
let args_len = $ARGS.len();
if args_len != $EXP {
return Err(DataFusionError::Plan(format!(
return error::query(format!(
"invalid number of arguments for {}, expected {}, got {args_len}",
$NAME, $EXP
)));
));
}
};
($NAME:expr, $LO:literal, $HI:literal, $ARGS:expr) => {
let args_len = $ARGS.len();
if !($LO..=$HI).contains(&args_len) {
return Err(DataFusionError::Plan(format!(
return error::query(format!(
"invalid number of arguments for {}, expected at least {} but no more than {}, got {args_len}",
$NAME, $LO, $HI
)));
));
}
};
}
@ -513,12 +510,7 @@ macro_rules! lit_integer {
($NAME:expr, $ARGS:expr, $POS:literal) => {
match &$ARGS[$POS] {
Expr::Literal(Literal::Integer(v)) => *v,
_ => {
return Err(DataFusionError::Plan(format!(
"expected integer argument in {}()",
$NAME
)))
}
_ => return error::query(format!("expected integer argument in {}()", $NAME)),
}
};
@ -536,12 +528,7 @@ macro_rules! lit_string {
($NAME:expr, $ARGS:expr, $POS:literal) => {
match &$ARGS[$POS] {
Expr::Literal(Literal::String(s)) => s.as_str(),
_ => {
return Err(DataFusionError::Plan(format!(
"expected string argument in {}()",
$NAME
)))
}
_ => return error::query(format!("expected string argument in {}()", $NAME)),
}
};
@ -606,22 +593,17 @@ impl FieldChecker {
//
// See: https://github.com/influxdata/influxdb/blob/98361e207349a3643bcc332d54b009818fe7585f/query/compile.go#L1002-L1012
if let Some(fill) = q.fill {
return Err(DataFusionError::Plan(format!(
"{fill} must be used with an aggregate function"
)));
return error::query(format!("{fill} must be used with an aggregate function"));
}
if self.has_group_by_time && !self.inherited_group_by_time {
return Err(DataFusionError::Plan(
"GROUP BY requires at least one aggregate function".to_owned(),
));
return error::query("GROUP BY requires at least one aggregate function");
}
}
2.. if self.has_top_bottom => {
return Err(DataFusionError::Plan(
"selector functions top and bottom cannot be combined with other functions"
.to_owned(),
))
return error::query(
"selector functions top and bottom cannot be combined with other functions",
)
}
_ => {}
}
@ -630,23 +612,19 @@ impl FieldChecker {
//
// See: https://github.com/influxdata/influxdb/blob/98361e207349a3643bcc332d54b009818fe7585f/query/compile.go#L1013-L1016
if self.has_distinct && (self.function_count() != 1 || self.has_non_aggregate_fields) {
return Err(DataFusionError::Plan(
"aggregate function distinct() cannot be combined with other functions or fields"
.to_owned(),
));
return error::query(
"aggregate function distinct() cannot be combined with other functions or fields",
);
}
// Validate we are using a selector or raw query if non-aggregate fields are projected.
if self.has_non_aggregate_fields {
if self.aggregate_count > 0 {
return Err(DataFusionError::Plan(
"mixing aggregate and non-aggregate columns is not supported".to_owned(),
));
return error::query("mixing aggregate and non-aggregate columns is not supported");
} else if self.selector_count > 1 {
return Err(DataFusionError::Plan(
"mixing multiple selector functions with tags or fields is not supported"
.to_owned(),
));
return error::query(
"mixing multiple selector functions with tags or fields is not supported",
);
}
}
@ -692,9 +670,9 @@ impl FieldChecker {
)
}
Expr::Binary(b) => match (&*b.lhs, &*b.rhs) {
(Expr::Literal(_), Expr::Literal(_)) => Err(DataFusionError::Plan(
"cannot perform a binary expression on two literals".to_owned(),
)),
(Expr::Literal(_), Expr::Literal(_)) => {
error::query("cannot perform a binary expression on two literals")
}
(Expr::Literal(_), other) | (other, Expr::Literal(_)) => self.check_expr(other),
(lhs, rhs) => {
self.check_expr(lhs)?;
@ -713,9 +691,7 @@ impl FieldChecker {
"internal: unexpected regex".into(),
)),
// See: https://github.com/influxdata/influxdb/blob/98361e207349a3643bcc332d54b009818fe7585f/query/compile.go#L347
Expr::Literal(_) => Err(DataFusionError::Plan(
"field must contain at least one variable".to_owned(),
)),
Expr::Literal(_) => error::query("field must contain at least one variable"),
}
}
@ -755,9 +731,9 @@ impl FieldChecker {
"percentile" => self.check_percentile(&c.args),
"sample" => self.check_sample(&c.args),
"distinct" => self.check_distinct(&c.args, false),
"top" | "bottom" if self.has_top_bottom => Err(DataFusionError::Plan(format!(
"top" | "bottom" if self.has_top_bottom => error::query(format!(
"selector function {name}() cannot be combined with other functions"
))),
)),
"top" | "bottom" => self.check_top_bottom(name, &c.args),
"derivative" | "non_negative_derivative" => self.check_derivative(name, &c.args),
"difference" | "non_negative_difference" => self.check_difference(name, &c.args),
@ -807,9 +783,7 @@ impl FieldChecker {
}
self.check_symbol(name, &c.args[0])
}
_ => Err(DataFusionError::Plan(format!(
"unsupported function {name}()"
))),
_ => error::query(format!("unsupported function {name}()")),
}
}
@ -821,10 +795,10 @@ impl FieldChecker {
&args[1],
Expr::Literal(Literal::Integer(_)) | Expr::Literal(Literal::Float(_))
) {
return Err(DataFusionError::Plan(format!(
return error::query(format!(
"expected number for percentile(), got {:?}",
&args[1]
)));
));
}
self.check_symbol("percentile", &args[0])
}
@ -838,9 +812,7 @@ impl FieldChecker {
//
// See: https://github.com/influxdata/influxdb/blob/98361e207349a3643bcc332d54b009818fe7585f/query/compile.go#L441-L443
if v <= 1 {
return Err(DataFusionError::Plan(format!(
"sample window must be greater than 1, got {v}"
)));
return error::query(format!("sample window must be greater than 1, got {v}"));
}
self.check_symbol("sample", &args[0])
@ -852,9 +824,7 @@ impl FieldChecker {
check_exp_args!("distinct", 1, args);
if !matches!(&args[0], Expr::VarRef(_)) {
return Err(DataFusionError::Plan(
"expected field argument in distinct()".to_owned(),
));
return error::query("expected field argument in distinct()");
}
if !nested {
@ -871,10 +841,10 @@ impl FieldChecker {
self.has_top_bottom = true;
if args.len() < 2 {
return Err(DataFusionError::Plan(format!(
return error::query(format!(
"invalid number of arguments for {name}, expected at least 2, got {}",
args.len()
)));
));
}
let (last, args) = args.split_last().expect("length >= 2");
@ -882,31 +852,29 @@ impl FieldChecker {
match last {
Expr::Literal(Literal::Integer(limit)) => {
if *limit <= 0 {
return Err(DataFusionError::Plan(format!(
return error::query(format!(
"limit ({limit}) for {name} must be greater than 0"
)));
));
}
}
got => {
return Err(DataFusionError::Plan(format!(
return error::query(format!(
"expected integer as last argument for {name}, got {got:?}"
)))
))
}
}
let (first, rest) = args.split_first().expect("length >= 1");
if !matches!(first, Expr::VarRef(_)) {
return Err(DataFusionError::Plan(format!(
"expected first argument to be a field for {name}"
)));
return error::query(format!("expected first argument to be a field for {name}"));
}
for expr in rest {
if !matches!(expr, Expr::VarRef(_)) {
return Err(DataFusionError::Plan(format!(
return error::query(format!(
"only fields or tags are allow for {name}(), got {expr:?}"
)));
));
}
}
@ -924,15 +892,13 @@ impl FieldChecker {
check_exp_args!(name, 1, 2, args);
match args.get(1) {
Some(Expr::Literal(Literal::Duration(d))) if **d <= 0 => {
return Err(DataFusionError::Plan(format!(
"duration argument must be positive, got {d}"
)))
return error::query(format!("duration argument must be positive, got {d}"))
}
None | Some(Expr::Literal(Literal::Duration(_))) => {}
Some(got) => {
return Err(DataFusionError::Plan(format!(
return error::query(format!(
"second argument to {name} must be a duration, got {got:?}"
)))
))
}
}
@ -945,15 +911,13 @@ impl FieldChecker {
match args.get(1) {
Some(Expr::Literal(Literal::Duration(d))) if **d <= 0 => {
return Err(DataFusionError::Plan(format!(
"duration argument must be positive, got {d}"
)))
return error::query(format!("duration argument must be positive, got {d}"))
}
None | Some(Expr::Literal(Literal::Duration(_))) => {}
Some(got) => {
return Err(DataFusionError::Plan(format!(
return error::query(format!(
"second argument to {name} must be a duration, got {got:?}"
)))
))
}
}
@ -980,9 +944,9 @@ impl FieldChecker {
let v = lit_integer!("moving_average", args, 1);
if v <= 1 {
return Err(DataFusionError::Plan(format!(
return error::query(format!(
"moving_average window must be greater than 1, got {v}"
)));
));
}
self.check_nested_symbol("moving_average", &args[0])
@ -994,22 +958,20 @@ impl FieldChecker {
let v = lit_integer!(name, args, 1);
if v < 1 {
return Err(DataFusionError::Plan(format!(
"{name} period must be greater than 1, got {v}"
)));
return error::query(format!("{name} period must be greater than 1, got {v}"));
}
if let Some(v) = lit_integer!(name, args, 2?) {
match (v, name) {
(v, "triple_exponential_derivative") if v < 1 && v != -1 => {
return Err(DataFusionError::Plan(format!(
return error::query(format!(
"{name} hold period must be greater than or equal to 1"
)))
))
}
(v, _) if v < 0 && v != -1 => {
return Err(DataFusionError::Plan(format!(
return error::query(format!(
"{name} hold period must be greater than or equal to 0"
)))
))
}
_ => {}
}
@ -1018,9 +980,9 @@ impl FieldChecker {
match lit_string!(name, args, 3?) {
Some("exponential" | "simple") => {}
Some(warmup) => {
return Err(DataFusionError::Plan(format!(
return error::query(format!(
"{name} warmup type must be one of: 'exponential', 'simple', got {warmup}"
)))
))
}
None => {}
}
@ -1034,16 +996,14 @@ impl FieldChecker {
let v = lit_integer!(name, args, 1);
if v < 1 {
return Err(DataFusionError::Plan(format!(
"{name} period must be greater than 1, got {v}"
)));
return error::query(format!("{name} period must be greater than 1, got {v}"));
}
if let Some(v) = lit_integer!(name, args, 2?) {
if v < 0 && v != -1 {
return Err(DataFusionError::Plan(format!(
return error::query(format!(
"{name} hold period must be greater than or equal to 0"
)));
));
}
}
@ -1056,25 +1016,23 @@ impl FieldChecker {
let v = lit_integer!(name, args, 1);
if v < 1 {
return Err(DataFusionError::Plan(format!(
"{name} period must be greater than 1, got {v}"
)));
return error::query(format!("{name} period must be greater than 1, got {v}"));
}
if let Some(v) = lit_integer!(name, args, 2?) {
if v < 0 && v != -1 {
return Err(DataFusionError::Plan(format!(
return error::query(format!(
"{name} hold period must be greater than or equal to 0"
)));
));
}
}
match lit_string!(name, args, 3?) {
Some("none" | "exponential" | "simple") => {}
Some(warmup) => {
return Err(DataFusionError::Plan(format!(
return error::query(format!(
"{name} warmup type must be one of: 'none', 'exponential' or 'simple', got {warmup}"
)))
))
}
None => {}
}
@ -1088,15 +1046,13 @@ impl FieldChecker {
match args.get(1) {
Some(Expr::Literal(Literal::Duration(d))) if **d <= 0 => {
return Err(DataFusionError::Plan(format!(
"duration argument must be positive, got {d}"
)))
return error::query(format!("duration argument must be positive, got {d}"))
}
None | Some(Expr::Literal(Literal::Duration(_))) => {}
Some(got) => {
return Err(DataFusionError::Plan(format!(
return error::query(format!(
"second argument to {name} must be a duration, got {got:?}"
)))
))
}
}
@ -1120,26 +1076,20 @@ impl FieldChecker {
let v = lit_integer!(name, args, 1);
if v < 1 {
return Err(DataFusionError::Plan(format!(
"{name} N argument must be greater than 0, got {v}"
)));
return error::query(format!("{name} N argument must be greater than 0, got {v}"));
}
let v = lit_integer!(name, args, 2);
if v < 0 {
return Err(DataFusionError::Plan(format!(
"{name} S argument cannot be negative, got {v}"
)));
return error::query(format!("{name} S argument cannot be negative, got {v}"));
}
match &args[0] {
Expr::Call(_) if !self.has_group_by_time => Err(DataFusionError::Plan(format!(
"{name} aggregate requires a GROUP BY interval"
))),
Expr::Call(_) if !self.has_group_by_time => {
error::query(format!("{name} aggregate requires a GROUP BY interval"))
}
expr @ Expr::Call(_) => self.check_nested_expr(expr),
_ => Err(DataFusionError::Plan(format!(
"must use aggregate function with {name}"
))),
_ => error::query(format!("must use aggregate function with {name}")),
}
}
@ -1168,15 +1118,13 @@ impl FieldChecker {
fn check_nested_symbol(&mut self, name: &str, expr: &Expr) -> Result<()> {
match expr {
Expr::Call(_) if !self.has_group_by_time => Err(DataFusionError::Plan(format!(
"{name} aggregate requires a GROUP BY interval"
))),
Expr::Call(_) => self.check_nested_expr(expr),
_ if self.has_group_by_time && !self.inherited_group_by_time => {
Err(DataFusionError::Plan(format!(
"aggregate function required inside the call to {name}"
)))
Expr::Call(_) if !self.has_group_by_time => {
error::query(format!("{name} aggregate requires a GROUP BY interval"))
}
Expr::Call(_) => self.check_nested_expr(expr),
_ if self.has_group_by_time && !self.inherited_group_by_time => error::query(format!(
"aggregate function required inside the call to {name}"
)),
_ => self.check_symbol(name, expr),
}
}
@ -1189,9 +1137,7 @@ impl FieldChecker {
Expr::Wildcard(_) | Expr::Literal(Literal::Regex(_)) => Err(DataFusionError::External(
"internal: unexpected wildcard or regex".into(),
)),
expr => Err(DataFusionError::Plan(format!(
"expected field argument in {name}(), got {expr:?}"
))),
expr => error::query(format!("expected field argument in {name}(), got {expr:?}")),
}
}
}
@ -1825,14 +1771,14 @@ mod test {
let err = rewrite_statement(&namespace, &stmt).unwrap_err();
assert_eq!(
err.to_string(),
"External error: unsupported expression: contains a wildcard or regular expression"
"Error during planning: unsupported expression: contains a wildcard or regular expression"
);
let stmt = parse_select("SELECT COUNT(*::tag) FROM cpu");
let err = rewrite_statement(&namespace, &stmt).unwrap_err();
assert_eq!(
err.to_string(),
"External error: unable to use tag as wildcard in count()"
"Error during planning: unable to use tag as wildcard in count()"
);
}

View File

@ -1,8 +1,8 @@
//! APIs for testing.
#![cfg(test)]
use crate::plan::SchemaProvider;
use datafusion::common::{DataFusionError, Result as DataFusionResult};
use crate::plan::{error, SchemaProvider};
use datafusion::common::Result as DataFusionResult;
use datafusion::datasource::empty::EmptyTable;
use datafusion::datasource::provider_as_source;
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, TableSource};
@ -156,7 +156,7 @@ impl SchemaProvider for MockSchemaProvider {
self.tables
.get(name)
.map(|(t, _)| Arc::clone(t))
.ok_or_else(|| DataFusionError::Plan(format!("measurement does not exist: {name}")))
.ok_or_else(|| error::map::query(format!("measurement does not exist: {name}")))
}
fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {

View File

@ -1,5 +1,6 @@
use crate::plan::error;
use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Offset, TimeZone};
use datafusion::common::{DataFusionError, Result};
use datafusion::common::Result;
/// Parse the timestamp string and return a DateTime in UTC.
fn parse_timestamp_utc(s: &str) -> Result<DateTime<FixedOffset>> {
@ -19,7 +20,7 @@ fn parse_timestamp_utc(s: &str) -> Result<DateTime<FixedOffset>> {
.map(|nd| nd.and_time(NaiveTime::default())),
)
.map(|ts| DateTime::from_utc(ts, chrono::Utc.fix()))
.map_err(|_| DataFusionError::Plan("invalid timestamp string".into()))
.map_err(|_| error::map::query("invalid timestamp string"))
}
/// Parse the timestamp string and return a DateTime in the specified timezone.
@ -50,7 +51,7 @@ fn parse_timestamp_tz(s: &str, tz: chrono_tz::Tz) -> Result<DateTime<FixedOffset
.ok_or(())
})
.map(|ts| ts.with_timezone(&ts.offset().fix()))
.map_err(|_| DataFusionError::Plan("invalid timestamp string".into()))
.map_err(|_| error::map::query("invalid timestamp string"))
}
/// Parse the string and return a `DateTime` using a fixed offset.

View File

@ -1,13 +1,11 @@
use crate::plan::util_copy;
use crate::plan::{error, util_copy};
use arrow::datatypes::DataType;
use datafusion::common::tree_node::{TreeNode, VisitRecursion};
use datafusion::common::{DFSchema, DFSchemaRef, DataFusionError, Result};
use datafusion::common::{DFSchema, DFSchemaRef, Result};
use datafusion::logical_expr::utils::expr_as_column_expr;
use datafusion::logical_expr::{coalesce, lit, Expr, ExprSchemable, LogicalPlan, Operator};
use influxdb_influxql_parser::expression::BinaryOperator;
use influxdb_influxql_parser::literal::Number;
use influxdb_influxql_parser::string::Regex;
use once_cell::sync::Lazy;
use query_functions::clean_non_meta_escapes;
use schema::Schema;
use std::sync::Arc;
@ -29,7 +27,7 @@ pub(in crate::plan) fn binary_operator_to_df_operator(op: BinaryOperator) -> Ope
pub(in crate::plan) fn schema_from_df(schema: &DFSchema) -> Result<Schema> {
let s: Arc<arrow::datatypes::Schema> = Arc::new(schema.into());
s.try_into().map_err(|err| {
DataFusionError::Internal(format!(
error::map::internal(format!(
"unable to convert DataFusion schema to IOx schema: {err}"
))
})
@ -53,9 +51,8 @@ impl Schemas {
/// Sanitize an InfluxQL regular expression and create a compiled [`regex::Regex`].
pub(crate) fn parse_regex(re: &Regex) -> Result<regex::Regex> {
let pattern = clean_non_meta_escapes(re.as_str());
regex::Regex::new(&pattern).map_err(|e| {
DataFusionError::External(format!("invalid regular expression '{re}': {e}").into())
})
regex::Regex::new(&pattern)
.map_err(|e| error::map::query(format!("invalid regular expression '{re}': {e}")))
}
/// Returns `n` as a literal expression of the specified `data_type`.
@ -69,9 +66,7 @@ fn number_to_expr(n: &Number, data_type: DataType) -> Result<Expr> {
(Number::Float(v), DataType::UInt64) => lit(*v as u64),
(n, data_type) => {
// The only output data types expected are Int64, Float64 or UInt64
return Err(DataFusionError::Internal(format!(
"no conversion from {n} to {data_type}"
)));
return error::internal(format!("no conversion from {n} to {data_type}"));
}
})
}
@ -119,51 +114,3 @@ pub(crate) fn rebase_expr(
})
}
}
/// Returns `true` if `expr` is an [`Expr::AggregateUDF`] for one of
/// the selector functions.
#[allow(unused)]
pub(crate) fn is_selector_aggregate_udf(expr: &Expr) -> bool {
static FUNCTIONS: Lazy<Vec<&'static str>> = Lazy::new(|| {
vec![
"selector_first",
"selector_last",
"selector_max",
"selector_min",
]
});
matches!(expr, Expr::AggregateUDF { fun, ..} if FUNCTIONS.contains(&fun.name.as_str()))
}
/// Collect all the references to selector functions, such as `selector_last`.
/// They are returned in order of occurrence (depth first), with duplicates omitted.
#[allow(unused)]
pub(crate) fn find_aggregate_selector_exprs(exprs: &[Expr]) -> Vec<Expr> {
exprs
.iter()
.flat_map(|expr| {
// Contains a list of unique selector UDAFs
let mut exprs = vec![];
expr.apply(&mut |expr| {
if is_selector_aggregate_udf(expr) {
if !exprs.contains(expr) {
exprs.push(expr.clone());
}
Ok(VisitRecursion::Skip)
} else {
Ok(VisitRecursion::Continue)
}
})
.unwrap();
exprs
})
.fold(vec![], |mut acc, expr| {
if !acc.contains(&expr) {
acc.push(expr)
}
acc
})
}

View File

@ -7,7 +7,7 @@
//! If these APIs are stabilised and made public, they can be removed from IOx.
//!
//! NOTE
use datafusion::common::{DataFusionError, Result};
use datafusion::common::Result;
use datafusion::logical_expr::{
expr::{
AggregateFunction, Between, BinaryExpr, Case, Cast, Expr, GetIndexedField, GroupingSet,