From 77b80e7618113a803903fbc6e76829a33030a3e4 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 3 Feb 2022 07:14:43 -0500 Subject: [PATCH] fix(InfluxQL): treat null tags as `''` rather than `null` in storagerpc queries (#3557) * fix(InfluxQL): treat null tags as `''` rather than `null` in storage rpc queries * test: add one more case * fix: Update comment Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .../server_type/database/rpc/storage/expr.rs | 38 +++- .../tests/end_to_end_cases/influxrpc.rs | 6 - predicate/src/rewrite.rs | 170 +++++++++++++++++- predicate/src/rpc_predicate.rs | 14 +- query/src/frontend/influxrpc.rs | 26 ++- query/src/pruning.rs | 64 ++++++- query/src/test.rs | 22 ++- 7 files changed, 318 insertions(+), 22 deletions(-) diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/expr.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/expr.rs index 7b1a60a9b7..a05fe82ebd 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/expr.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/storage/expr.rs @@ -8,6 +8,8 @@ use std::collections::BTreeSet; use std::{convert::TryFrom, fmt}; +use datafusion::error::DataFusionError; +use datafusion::logical_plan::when; use datafusion::{ logical_plan::{binary_expr, Expr, Operator}, prelude::*, @@ -106,6 +108,12 @@ pub enum Error { #[snafu(display("Error converting field_name to utf8: {}", source))] ConvertingFieldName { source: std::string::FromUtf8Error }, + + #[snafu(display("Internal error creating CASE from tag_ref '{}: {}", tag_name, source))] + InternalCaseConversion { + tag_name: String, + source: DataFusionError, + }, } pub type Result = std::result::Result; @@ -293,6 +301,7 @@ fn convert_simple_node( mut builder: InfluxRpcPredicateBuilder, node: RPCNode, ) -> Result { + // Attempt to identify OR lists if let Ok(in_list) = InList::try_from(&node) { let InList { lhs, value_list } = in_list; @@ -333,7 +342,7 @@ fn flatten_ands(node: RPCNode, mut dst: Vec) -> Result> { // Represents a predicate like IN (option1, option2, option3, ....) // -// use `try_from_node1 to convert a tree like as ((expr = option1) OR (expr = +// use `try_from_node` to convert a tree like as ((expr = option1) OR (expr = // option2)) or (expr = option3)) ... into such a form #[derive(Debug)] struct InList { @@ -493,13 +502,38 @@ fn build_node(value: RPCValue, inputs: Vec) -> Result { RPCValue::UintValue(v) => Ok(lit(v)), RPCValue::FloatValue(f) => Ok(lit(f)), RPCValue::RegexValue(pattern) => Ok(lit(pattern)), - RPCValue::TagRefValue(tag_name) => Ok(col(&make_tag_name(tag_name)?)), + RPCValue::TagRefValue(tag_name) => build_tag_ref(tag_name), RPCValue::FieldRefValue(field_name) => Ok(col(&field_name)), RPCValue::Logical(logical) => build_logical_node(logical, inputs), RPCValue::Comparison(comparison) => build_comparison_node(comparison, inputs), } } +/// Converts InfluxRPC nodes like `TagRef(tag_name)`: +/// +/// Special tags (_measurement, _field) -> reference to those names +/// +/// Other tags +/// +/// ```sql +/// CASE +/// WHEN tag_name IS NULL THEN '' +/// ELSE tag_name +/// ``` +/// +/// As storage predicates such as `TagRef(tag_name) = ''` expect to +/// match missing tags which IOx stores as NULL +fn build_tag_ref(tag_name: Vec) -> Result { + let tag_name = make_tag_name(tag_name)?; + + match tag_name.as_str() { + MEASUREMENT_COLUMN_NAME | FIELD_COLUMN_NAME => Ok(col(&tag_name)), + _ => when(col(&tag_name).is_null(), lit("")) + .otherwise(col(&tag_name)) + .context(InternalCaseConversionSnafu { tag_name }), + } +} + /// Creates an expr from a "Logical" Node fn build_logical_node(logical: i32, inputs: Vec) -> Result { let logical_enum = RPCLogical::from_i32(logical); diff --git a/influxdb_iox/tests/end_to_end_cases/influxrpc.rs b/influxdb_iox/tests/end_to_end_cases/influxrpc.rs index 41820a99c8..f4eefddfdb 100644 --- a/influxdb_iox/tests/end_to_end_cases/influxrpc.rs +++ b/influxdb_iox/tests/end_to_end_cases/influxrpc.rs @@ -340,8 +340,6 @@ pub async fn read_filter_regex_operator() { } #[tokio::test] -// fixed in https://github.com/influxdata/influxdb_iox/pull/3557 -#[ignore] pub async fn read_filter_empty_tag_eq() { do_read_filter_test( vec!["cpu value=1 1000", "cpu,host=server01 value=2 2000"], @@ -361,8 +359,6 @@ pub async fn read_filter_empty_tag_eq() { } #[tokio::test] -// fixed in https://github.com/influxdata/influxdb_iox/pull/3557 -#[ignore] pub async fn read_filter_empty_tag_not_regex() { do_read_filter_test( vec!["cpu value=1 1000", "cpu,host=server01 value=2 2000"], @@ -382,8 +378,6 @@ pub async fn read_filter_empty_tag_not_regex() { } #[tokio::test] -// fixed in https://github.com/influxdata/influxdb_iox/pull/3557 -#[ignore] pub async fn read_filter_empty_tag_regex() { do_read_filter_test( vec!["cpu value=1 1000", "cpu,host=server01 value=2 2000"], diff --git a/predicate/src/rewrite.rs b/predicate/src/rewrite.rs index 2a940b02f4..2a95d67b67 100644 --- a/predicate/src/rewrite.rs +++ b/predicate/src/rewrite.rs @@ -31,11 +31,30 @@ use datafusion::{ /// ELSE tag_col = 'cpu' /// END /// ``` - pub fn rewrite(expr: Expr) -> Result { expr.rewrite(&mut IOxExprRewriter::new()) } +/// Special purpose `Expr` rewrite rules for an Expr that is used as a predcate. +/// +/// In general the rewrite rules in Datafusion and IOx attempt to +/// preserve the sematics of an expression, especially with respect to +/// nulls. This means that certain expressions can not be simplified +/// (as they may become null) +/// +/// However, for `Expr`s used as filters, only rows for which the +/// `Expr` evaluates to 'true' are returned. Those rows for which the +/// `Expr` evaluates to `false` OR `null` are filtered out. +/// +/// This function simplifies `Expr`s that are being used as +/// predicates. +/// +/// Currently it is special cases, but it would be great to generalize +/// it and contribute it back to DataFusion +pub fn simplify_predicate(expr: Expr) -> Result { + expr.rewrite(&mut IOxPredicateRewriter::new()) +} + /// see docs on [rewrite] struct IOxExprRewriter {} @@ -145,6 +164,110 @@ fn inline_case(case_on_left: bool, left: Expr, right: Expr, op: Operator) -> Exp } } +/// see docs on [simplify_predicate] +struct IOxPredicateRewriter {} + +impl IOxPredicateRewriter { + fn new() -> Self { + Self {} + } +} + +/// returns the column name for a column expression +fn is_col(expr: &Expr) -> Option<&str> { + if let Expr::Column(c) = &expr { + Some(c.name.as_str()) + } else { + None + } +} + +/// returns the column name for an expression like `IS NULL(col)` +fn is_col_null(expr: &Expr) -> Option<&str> { + if let Expr::IsNull(arg) = &expr { + is_col(arg) + } else { + None + } +} + +/// returns the column name for an expression like `IS NOT NULL(col)` or `NOT(IS NULL(col))` +fn is_col_not_null(expr: &Expr) -> Option<&str> { + match expr { + Expr::IsNotNull(arg) => is_col(arg), + Expr::Not(arg) => is_col_null(arg), + _ => None, + } +} + +fn is_lit(expr: &Expr) -> bool { + matches!(expr, Expr::Literal(_)) +} + +/// returns the column name for an expression like `col = ` +fn is_col_op_lit(expr: &Expr) -> Option<&str> { + match expr { + Expr::BinaryExpr { left, op: _, right } if is_lit(right) => is_col(left), + Expr::BinaryExpr { left, op: _, right } if is_lit(left) => is_col(right), + _ => None, + } +} + +impl ExprRewriter for IOxPredicateRewriter { + fn mutate(&mut self, expr: Expr) -> Result { + // look for this structure: + // + // NOT(col IS NULL) AND col = 'foo' + // + // and replace it with + // + // col = 'foo' + // + // Proof: + // Case 1: col is NULL + // + // not (NULL IS NULL) AND col = 'foo' + // not (true) AND NULL = 'foo' + // NULL + // + // Case 2: col is not NULL and not equal to 'foo' + // not (false) AND false + // true AND false + // false + // + // Case 3: col is not NULL and equal to 'foo' + // not (false) AND true + // true AND true + // true + match expr { + Expr::BinaryExpr { + left, + op: Operator::And, + right, + } => { + if let (Some(coll), Some(colr)) = (is_col_not_null(&left), is_col_op_lit(&right)) { + if colr == coll { + return Ok(*right); + } + } else if let (Some(coll), Some(colr)) = + (is_col_op_lit(&left), is_col_not_null(&right)) + { + if colr == coll { + return Ok(*left); + } + }; + + Ok(Expr::BinaryExpr { + left, + op: Operator::And, + right, + }) + } + expr => Ok(expr), + } + } +} + #[cfg(test)] mod tests { use std::ops::Add; @@ -332,4 +455,49 @@ mod tests { .otherwise(otherwise_expr) .unwrap() } + + #[test] + fn test_simplify_predicate() { + let expr = col("foo").is_null().not().and(col("foo").eq(lit("bar"))); + let expected = col("foo").eq(lit("bar")); + assert_eq!(expected, simplify_predicate(expr).unwrap()); + } + + #[test] + fn test_simplify_predicate_reversed() { + let expr = col("foo").eq(lit("bar")).and(col("foo").is_null().not()); + let expected = col("foo").eq(lit("bar")); + assert_eq!(expected, simplify_predicate(expr).unwrap()); + } + + #[test] + fn test_simplify_predicate_different_col() { + // only works when col references are the same + let expr = col("foo").is_null().not().and(col("foo2").eq(lit("bar"))); + let expected = expr.clone(); + assert_eq!(expected, simplify_predicate(expr).unwrap()); + } + + #[test] + fn test_simplify_predicate_different_col_reversed() { + // only works when col references are the same + let expr = col("foo2").eq(lit("bar")).and(col("foo").is_null().not()); + let expected = expr.clone(); + assert_eq!(expected, simplify_predicate(expr).unwrap()); + } + + #[test] + fn test_simplify_predicate_is_not_null() { + let expr = col("foo").is_not_null().and(col("foo").eq(lit("bar"))); + let expected = col("foo").eq(lit("bar")); + assert_eq!(expected, simplify_predicate(expr).unwrap()); + } + + #[test] + fn test_simplify_predicate_complex() { + // can't rewrite to some thing else fancy on the right + let expr = col("foo").is_null().not().and(col("foo").eq(col("foo"))); + let expected = expr.clone(); + assert_eq!(expected, simplify_predicate(expr).unwrap()); + } } diff --git a/predicate/src/rpc_predicate.rs b/predicate/src/rpc_predicate.rs index 60ac6397ac..2004d8e1c4 100644 --- a/predicate/src/rpc_predicate.rs +++ b/predicate/src/rpc_predicate.rs @@ -1,6 +1,7 @@ //! Interface logic between IOx ['Predicate`] and predicates used by the //! InfluxDB Storage gRPC API use crate::predicate::{BinaryExpr, Predicate}; +use crate::rewrite; use datafusion::error::Result as DataFusionResult; use datafusion::execution::context::ExecutionProps; @@ -172,14 +173,23 @@ fn normalize_predicate( // column projection set. rewrite_field_column_references(&mut field_projections, e) }) + .map(|e| { + // apply IOx specific rewrites (that unlock other simplifications) + rewrite::rewrite(e).expect("rewrite failed") + }) .map(|e| { if let Some(schema) = &schema { - e.simplify(&SimplifyAdapter::new(schema.as_ref())) - .expect("Expression simplificiation failed") + let adapter = SimplifyAdapter::new(schema.as_ref()); + // simplify twice to ensure "full" cleanup + e.simplify(&adapter) + .expect("Expression simplificiation round 1 failed") + .simplify(&adapter) + .expect("Expression simplificiation round 2 failed") } else { e } }) + .map(|e| rewrite::simplify_predicate(e).expect("simplify failed")) .collect::>(); // Store any field value (`_value`) expressions on the `Predicate`. predicate.value_expr = field_value_exprs; diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index 89eebe22fc..5800c73d06 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -9,10 +9,9 @@ use data_types::chunk_metadata::ChunkId; use datafusion::{ error::{DataFusionError, Result as DatafusionResult}, logical_plan::{ - binary_expr, lit, when, DFSchema, DFSchemaRef, Expr, ExprRewriter, LogicalPlan, + binary_expr, col, lit, when, DFSchema, DFSchemaRef, Expr, ExprRewriter, LogicalPlan, LogicalPlanBuilder, }, - prelude::col, scalar::ScalarValue, }; use datafusion_util::AsExpr; @@ -2039,9 +2038,19 @@ mod tests { .with_time_column(), ); + // this is what happens with a grpc predicate on a tag + // + // tag(foo) = 'bar' becomes + // + // CASE WHEN foo IS NULL then '' ELSE foo END = 'bar' + // + // It is critical to be rewritten foo = 'bar' correctly so + // that it can be evaluated quickly + let expr = when(col("foo").is_null(), lit("")) + .otherwise(col("foo")) + .unwrap(); let silly_predicate = PredicateBuilder::new() - // (foo = 'bar') OR false - .add_expr(col("foo").eq(lit("bar")).or(lit(false))) + .add_expr(expr.eq(lit("bar"))) .build(); let executor = Arc::new(Executor::new(1)); @@ -2055,11 +2064,10 @@ mod tests { let actual_predicate = test_db.get_chunks_predicate(); - // verify that the predicate was rewritten to foo = 'bar' - let expected_predicate = PredicateBuilder::new() - // (foo = 'bar') OR false - .add_expr(col("foo").eq(lit("bar"))) - .build(); + // verify that the predicate was rewritten to `foo = 'bar'` + let expr = col("foo").eq(lit("bar")); + + let expected_predicate = PredicateBuilder::new().add_expr(expr).build(); assert_eq!( actual_predicate, expected_predicate, diff --git a/query/src/pruning.rs b/query/src/pruning.rs index 8274e1b97c..0bb4d1c9ce 100644 --- a/query/src/pruning.rs +++ b/query/src/pruning.rs @@ -30,7 +30,8 @@ pub trait PruningObserver { } /// Given a Vec of prunable items, returns a possibly smaller set -/// filtering those that can not pass the predicate. +/// filtering those where the predicate can be proven to evaluate to +/// `false` for every single row. /// /// TODO(raphael): Perhaps this should return `Result>` instead of /// the [`PruningObserver`] plumbing @@ -626,6 +627,67 @@ mod test { assert_eq!(names(&pruned), vec!["chunk2", "chunk3"]); } + #[test] + fn test_pruned_is_null() { + test_helpers::maybe_start_logging(); + // Verify that type of predicate is pruned if column1 is null + // (this is a common predicate type created by the INfluxRPC planner) + // (NOT column1 IS NULL) AND (column1 = 'bar') + let observer = TestObserver::new(); + // No nulls, can't prune as it has values that are more and less than 'bar' + let c1 = Arc::new( + TestChunk::new("chunk1").with_tag_column_with_nulls_and_full_stats( + "column1", + Some("a"), + Some("z"), + 100, + None, + 0, + ), + ); + + // Has no nulls, can prune it out based on statistics alone + let c2 = Arc::new( + TestChunk::new("chunk2").with_tag_column_with_nulls_and_full_stats( + "column1", + Some("a"), + Some("b"), + 100, + None, + 0, + ), + ); + + // Has nulls, can still can prune it out based on statistics alone + let c3 = Arc::new( + TestChunk::new("chunk3").with_tag_column_with_nulls_and_full_stats( + "column1", + Some("a"), + Some("b"), + 100, + None, + 1, // that one peksy null! + ), + ); + + let predicate = PredicateBuilder::new() + .add_expr( + col("column1") + .is_null() + .not() + .and(col("column1").eq(lit("bar"))), + ) + .build(); + + let chunks = vec![c1, c2, c3]; + let schema = merge_schema(&chunks); + + let pruned = prune_chunks(&observer, schema, chunks, &predicate); + + assert_eq!(observer.events(), vec!["chunk2: Pruned", "chunk3: Pruned"]); + assert_eq!(names(&pruned), vec!["chunk1"]); + } + #[test] fn test_pruned_multi_column() { test_helpers::maybe_start_logging(); diff --git a/query/src/test.rs b/query/src/test.rs index 9ac857324e..7436f5e717 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -376,8 +376,28 @@ impl TestChunk { count: u64, distinct_count: Option, ) -> Self { - let column_name = column_name.into(); let null_count = 0; + self.with_tag_column_with_nulls_and_full_stats( + column_name, + min, + max, + count, + distinct_count, + null_count, + ) + } + + /// Register a tag column with stats with the test chunk + pub fn with_tag_column_with_nulls_and_full_stats( + self, + column_name: impl Into, + min: Option<&str>, + max: Option<&str>, + count: u64, + distinct_count: Option, + null_count: u64, + ) -> Self { + let column_name = column_name.into(); // make a new schema with the specified column and // merge it in to any existing schema