fix: Make errors in rewriting return `Error` rather than a `panic` (#3767)

* test: add test for predicate errors

* fix: Return errors properly rather than panic

* fix: handle errors in influxrpc planner

* fix: appease clippy

* fix: tests

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-02-17 10:39:14 -05:00 committed by GitHub
parent 9d25e5bac3
commit 9588b43a90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 189 additions and 67 deletions

View File

@ -894,7 +894,7 @@ mod tests {
}
fn table_predicate(predicate: InfluxRpcPredicate) -> Predicate {
let predicates = predicate.table_predicates(&Tables::new(&["foo"]));
let predicates = predicate.table_predicates(&Tables::new(&["foo"])).unwrap();
assert_eq!(predicates.len(), 1);
predicates.into_iter().next().unwrap().1
}
@ -986,7 +986,7 @@ mod tests {
let tables = Tables::new(&["foo", "bar"]);
let table_predicates = predicate.table_predicates(&tables);
let table_predicates = predicate.table_predicates(&tables).unwrap();
assert_eq!(table_predicates.len(), 2);
for (expected_table, (table, predicate)) in tables.table_names.iter().zip(table_predicates)

View File

@ -2,7 +2,7 @@
//! InfluxDB Storage gRPC API
use crate::{rewrite, BinaryExpr, Predicate};
use datafusion::error::Result as DataFusionResult;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_plan::{
lit, Column, Expr, ExprRewritable, ExprRewriter, ExprSchema, ExprSchemable, ExprSimplifiable,
@ -84,7 +84,7 @@ impl InfluxRpcPredicate {
pub fn table_predicates<D: QueryDatabaseMeta>(
&self,
table_info: &D,
) -> Vec<(String, Predicate)> {
) -> DataFusionResult<Vec<(String, Predicate)>> {
let table_names = match &self.table_names {
Some(table_names) => itertools::Either::Left(table_names.iter().cloned()),
None => itertools::Either::Right(table_info.table_names().into_iter()),
@ -93,9 +93,9 @@ impl InfluxRpcPredicate {
table_names
.map(|table| {
let schema = table_info.table_schema(&table);
let predicate = normalize_predicate(&table, schema, &self.inner);
let predicate = normalize_predicate(&table, schema, &self.inner)?;
(table, predicate)
Ok((table, predicate))
})
.collect()
}
@ -154,7 +154,7 @@ fn normalize_predicate(
table_name: &str,
schema: Option<Arc<Schema>>,
predicate: &Predicate,
) -> Predicate {
) -> DataFusionResult<Predicate> {
let mut predicate = predicate.clone();
let mut field_projections = BTreeSet::new();
let mut field_value_exprs = vec![];
@ -162,35 +162,31 @@ fn normalize_predicate(
predicate.exprs = predicate
.exprs
.into_iter()
.map(|e| rewrite_measurement_references(table_name, e))
// Rewrite any references to `_value = some_value` to literal true values.
// Keeps track of these expressions, which can then be used to
// augment field projections with conditions using `CASE` statements.
.map(|e| rewrite_field_value_references(&mut field_value_exprs, e))
.map(|e| {
// Rewrite any references to `_field = a_field_name` with a literal true
// and keep track of referenced field names to add to the field
// column projection set.
rewrite_field_column_references(&mut field_projections, e)
rewrite_measurement_references(table_name, e)
// Rewrite any references to `_value = some_value` to literal true values.
// Keeps track of these expressions, which can then be used to
// augment field projections with conditions using `CASE` statements.
.and_then(|e| rewrite_field_value_references(&mut field_value_exprs, e))
// Rewrite any references to `_field = a_field_name` with a literal true
// and keep track of referenced field names to add to the field
// column projection set.
.and_then(|e| rewrite_field_column_references(&mut field_projections, e))
// apply IOx specific rewrites (that unlock other simplifications)
.and_then(rewrite::rewrite)
// Call the core DataFusion simplification logic
.and_then(|e| {
if let Some(schema) = &schema {
let adapter = SimplifyAdapter::new(schema.as_ref());
// simplify twice to ensure "full" cleanup
e.simplify(&adapter)?.simplify(&adapter)
} else {
Ok(e)
}
})
.and_then(rewrite::simplify_predicate)
})
.map(|e| {
// apply IOx specific rewrites (that unlock other simplifications)
rewrite::rewrite(e).expect("rewrite failed")
})
.map(|e| {
if let Some(schema) = &schema {
let adapter = SimplifyAdapter::new(schema.as_ref());
// simplify twice to ensure "full" cleanup
e.simplify(&adapter)
.expect("Expression simplificiation round 1 failed")
.simplify(&adapter)
.expect("Expression simplificiation round 2 failed")
} else {
e
}
})
.map(|e| rewrite::simplify_predicate(e).expect("simplify failed"))
.collect::<Vec<_>>();
.collect::<DataFusionResult<Vec<_>>>()?;
// Store any field value (`_value`) expressions on the `Predicate`.
predicate.value_expr = field_value_exprs;
@ -200,7 +196,7 @@ fn normalize_predicate(
None => predicate.field_columns = Some(field_projections),
};
}
predicate
Ok(predicate)
}
struct SimplifyAdapter<'a> {
@ -254,18 +250,17 @@ impl<'a> ExprSchema for SimplifyAdapter<'a> {
fn data_type(&self, col: &Column) -> DataFusionResult<&arrow::datatypes::DataType> {
assert!(col.relation.is_none());
Ok(self
.field(&col.name)
self.field(&col.name)
.map(|f| f.data_type())
.expect("found field for datatype"))
.ok_or_else(|| DataFusionError::Plan(format!("Unknown field {}", &col.name)))
}
}
/// Rewrites all references to the [MEASUREMENT_COLUMN_NAME] column
/// with the actual table name
fn rewrite_measurement_references(table_name: &str, expr: Expr) -> Expr {
fn rewrite_measurement_references(table_name: &str, expr: Expr) -> DataFusionResult<Expr> {
let mut rewriter = MeasurementRewriter { table_name };
expr.rewrite(&mut rewriter).expect("rewrite is infallible")
expr.rewrite(&mut rewriter)
}
struct MeasurementRewriter<'a> {
@ -291,9 +286,12 @@ impl ExprRewriter for MeasurementRewriter<'_> {
/// Rewrites an expression on `_value` as a boolean true literal, pushing any
/// encountered expressions onto `value_exprs` so they can be moved onto column
/// projections.
fn rewrite_field_value_references(value_exprs: &mut Vec<BinaryExpr>, expr: Expr) -> Expr {
fn rewrite_field_value_references(
value_exprs: &mut Vec<BinaryExpr>,
expr: Expr,
) -> DataFusionResult<Expr> {
let mut rewriter = FieldValueRewriter { value_exprs };
expr.rewrite(&mut rewriter).expect("rewrite is infallible")
expr.rewrite(&mut rewriter)
}
struct FieldValueRewriter<'a> {
@ -334,9 +332,9 @@ impl<'a> ExprRewriter for FieldValueRewriter<'a> {
fn rewrite_field_column_references(
field_projections: &'_ mut BTreeSet<String>,
expr: Expr,
) -> Expr {
) -> DataFusionResult<Expr> {
let mut rewriter = FieldColumnRewriter { field_projections };
expr.rewrite(&mut rewriter).expect("rewrite is infallible")
expr.rewrite(&mut rewriter)
}
struct FieldColumnRewriter<'a> {

View File

@ -96,6 +96,11 @@ pub enum Error {
source: crate::provider::Error,
},
#[snafu(display("gRPC planner got error creating predicates: {}", source))]
CreatingPredicates {
source: datafusion::error::DataFusionError,
},
#[snafu(display("gRPC planner got error building plan: {}", source))]
BuildingPlan {
source: datafusion::error::DataFusionError,
@ -227,7 +232,9 @@ impl InfluxRpcPlanner {
// Mapping between table and chunks that need full plan
let mut full_plan_table_chunks = BTreeMap::new();
let table_predicates = rpc_predicate.table_predicates(database);
let table_predicates = rpc_predicate
.table_predicates(database)
.context(CreatingPredicatesSnafu)?;
for (table_name, predicate) in &table_predicates {
// Identify which chunks can answer from its metadata and then record its table,
// and which chunks needs full plan and group them into their table
@ -334,7 +341,9 @@ impl InfluxRpcPlanner {
let mut need_full_plans = BTreeMap::new();
let mut known_columns = BTreeSet::new();
let table_predicates = rpc_predicate.table_predicates(database);
let table_predicates = rpc_predicate
.table_predicates(database)
.context(CreatingPredicatesSnafu)?;
for (table_name, predicate) in &table_predicates {
for chunk in database.chunks(table_name, predicate) {
// If there are delete predicates, we need to scan (or do full plan) the data to eliminate
@ -464,7 +473,9 @@ impl InfluxRpcPlanner {
let mut need_full_plans = BTreeMap::new();
let mut known_values = BTreeSet::new();
let table_predicates = rpc_predicate.table_predicates(database);
let table_predicates = rpc_predicate
.table_predicates(database)
.context(CreatingPredicatesSnafu)?;
for (table_name, predicate) in &table_predicates {
for chunk in database.chunks(table_name, predicate) {
// If there are delete predicates, we need to scan (or do full plan) the data to eliminate
@ -629,7 +640,9 @@ impl InfluxRpcPlanner {
// The executor then figures out which columns have non-null
// values and stops the plan executing once it has them
let table_predicates = rpc_predicate.table_predicates(database);
let table_predicates = rpc_predicate
.table_predicates(database)
.context(CreatingPredicatesSnafu)?;
let mut field_list_plan = FieldListPlan::with_capacity(table_predicates.len());
for (table_name, predicate) in &table_predicates {
@ -680,7 +693,9 @@ impl InfluxRpcPlanner {
{
debug!(?rpc_predicate, "planning read_filter");
let table_predicates = rpc_predicate.table_predicates(database);
let table_predicates = rpc_predicate
.table_predicates(database)
.context(CreatingPredicatesSnafu)?;
let mut ss_plans = Vec::with_capacity(table_predicates.len());
for (table_name, predicate) in &table_predicates {
let chunks = database.chunks(table_name, predicate);
@ -736,7 +751,9 @@ impl InfluxRpcPlanner {
{
debug!(?rpc_predicate, ?agg, "planning read_group");
let table_predicates = rpc_predicate.table_predicates(database);
let table_predicates = rpc_predicate
.table_predicates(database)
.context(CreatingPredicatesSnafu)?;
let mut ss_plans = Vec::with_capacity(table_predicates.len());
for (table_name, predicate) in &table_predicates {
@ -798,7 +815,9 @@ impl InfluxRpcPlanner {
);
// group tables by chunk, pruning if possible
let table_predicates = rpc_predicate.table_predicates(database);
let table_predicates = rpc_predicate
.table_predicates(database)
.context(CreatingPredicatesSnafu)?;
let mut ss_plans = Vec::with_capacity(table_predicates.len());
for (table_name, predicate) in &table_predicates {
let chunks = database.chunks(table_name, predicate);

View File

@ -1,21 +1,25 @@
//! Tests for the Influx gRPC queries
use std::sync::Arc;
#[cfg(test)]
use crate::scenarios::{
DbScenario, DbSetup, NoData, TwoMeasurements, TwoMeasurementsManyFields,
TwoMeasurementsWithDelete, TwoMeasurementsWithDeleteAll,
};
use crate::{
influxrpc::util::run_series_set_plan,
influxrpc::util::run_series_set_plan_maybe_error,
scenarios::{
MeasurementStatusCode, MeasurementsForDefect2845, MeasurementsSortableTags,
MeasurementsSortableTagsWithDelete, TwoMeasurementsMultiSeries,
TwoMeasurementsMultiSeriesWithDelete, TwoMeasurementsMultiSeriesWithDeleteAll,
},
};
use datafusion::logical_plan::{col, lit};
use datafusion::logical_plan::{col, lit, when};
use db::Db;
use predicate::rpc_predicate::InfluxRpcPredicate;
use predicate::PredicateBuilder;
use query::frontend::influxrpc::InfluxRpcPlanner;
use test_helpers::assert_contains;
/// runs read_filter(predicate) and compares it to the expected
/// output
@ -34,14 +38,9 @@ async fn run_read_filter_test_case<D>(
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRpcPlanner::new();
let plan = planner
.read_filter(db.as_ref(), predicate.clone())
.expect("built plan successfully");
let ctx = db.executor().new_context(query::exec::ExecutorType::Query);
let string_results = run_series_set_plan(&ctx, plan).await;
let string_results = run_read_filter(predicate.clone(), db)
.await
.expect("Unexpected error running read filter");
assert_eq!(
expected_results, string_results,
@ -51,6 +50,48 @@ async fn run_read_filter_test_case<D>(
}
}
/// runs read_filter(predicate) and compares it to the expected
/// output
async fn run_read_filter(
predicate: InfluxRpcPredicate,
db: Arc<Db>,
) -> Result<Vec<String>, String> {
let planner = InfluxRpcPlanner::new();
let plan = planner
.read_filter(db.as_ref(), predicate)
.map_err(|e| e.to_string())?;
let ctx = db.executor().new_context(query::exec::ExecutorType::Query);
run_series_set_plan_maybe_error(&ctx, plan)
.await
.map_err(|e| e.to_string())
}
/// runs read_filter(predicate), expecting an error and compares to expected message
async fn run_read_filter_error_case<D>(
db_setup: D,
predicate: InfluxRpcPredicate,
expected_error: &str,
) where
D: DbSetup,
{
test_helpers::maybe_start_logging();
for scenario in db_setup.make().await {
let DbScenario {
scenario_name, db, ..
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let result = run_read_filter(predicate.clone(), db)
.await
.expect_err("Unexpected success running error case");
assert_contains!(result.to_string(), expected_error);
}
}
#[tokio::test]
async fn test_read_filter_no_data_no_pred() {
let expected_results = vec![] as Vec<&str>;
@ -138,13 +179,65 @@ async fn test_read_filter_data_tag_predicate() {
run_read_filter_test_case(TwoMeasurements {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_invalid_predicate() {
let predicate = PredicateBuilder::new()
// region > 5 (region is a tag(string) column, so this predicate is invalid)
.add_expr(col("region").gt(lit(5i32)))
.build();
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_error = "Error during planning: 'Dictionary(Int32, Utf8) > Int32' can't be evaluated because there isn't a common type to coerce the types to";
run_read_filter_error_case(TwoMeasurements {}, predicate, expected_error).await;
}
#[tokio::test]
async fn test_read_filter_invalid_predicate_case() {
let predicate = PredicateBuilder::new()
// https://github.com/influxdata/influxdb_iox/issues/3635
// model what happens when a field is treated like a tag
// CASE WHEN system" IS NULL THEN '' ELSE system END = 5;
.add_expr(
when(col("system").is_null(), lit(""))
.otherwise(col("system"))
.unwrap()
.eq(lit(5i32)),
)
.build();
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_error = "gRPC planner got error creating predicates: Error during planning: 'Utf8 = Int32' can't be evaluated because there isn't a common type to coerce the types to";
run_read_filter_error_case(TwoMeasurements {}, predicate, expected_error).await;
}
#[tokio::test]
async fn test_read_filter_unknown_column_in_predicate() {
let predicate = PredicateBuilder::new()
// mystery_region is not a real column, so this predicate is
// invalid but IOx should be able to handle it (and produce no results)
.add_expr(
col("baz")
.eq(lit(4i32))
.or(col("bar").and(col("mystery_region").gt(lit(5i32)))),
)
.build();
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![];
run_read_filter_test_case(TwoMeasurements {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_no_pred_with_delete() {
let expected_results = vec![
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [100], values: [70.4]",
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [350], values: [90.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
"Series tags={_measurement=h2o, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [100], values: [70.4]",
"Series tags={_measurement=h2o, city=LA, state=CA, _field=temp}\n FloatPoints timestamps: [350], values: [90.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=reading}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
"Series tags={_measurement=o2, city=Boston, state=MA, _field=temp}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
];
run_read_filter_test_case(

View File

@ -1,3 +1,4 @@
use datafusion::error::DataFusionError;
use query::exec::IOxExecutionContext;
use query::plan::seriesset::SeriesSetPlans;
@ -9,10 +10,21 @@ use query::plan::seriesset::SeriesSetPlans;
/// items are returned.
#[cfg(test)]
pub async fn run_series_set_plan(ctx: &IOxExecutionContext, plans: SeriesSetPlans) -> Vec<String> {
ctx.to_series_and_groups(plans)
run_series_set_plan_maybe_error(ctx, plans)
.await
.expect("running plans")
}
/// Run a series set plan to completion and produce a Result<Vec<String>> representation
#[cfg(test)]
pub async fn run_series_set_plan_maybe_error(
ctx: &IOxExecutionContext,
plans: SeriesSetPlans,
) -> Result<Vec<String>, DataFusionError> {
Ok(ctx
.to_series_and_groups(plans)
.await?
.into_iter()
.map(|series_or_group| series_or_group.to_string())
.collect()
.collect())
}