fix(InfluxQL): treat null tags as `''` rather than `null` in storagerpc queries (#3557)
* fix(InfluxQL): treat null tags as `''` rather than `null` in storage rpc queries * test: add one more case * fix: Update comment Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
ce46bbaada
commit
77b80e7618
|
@ -8,6 +8,8 @@
|
|||
use std::collections::BTreeSet;
|
||||
use std::{convert::TryFrom, fmt};
|
||||
|
||||
use datafusion::error::DataFusionError;
|
||||
use datafusion::logical_plan::when;
|
||||
use datafusion::{
|
||||
logical_plan::{binary_expr, Expr, Operator},
|
||||
prelude::*,
|
||||
|
@ -106,6 +108,12 @@ pub enum Error {
|
|||
|
||||
#[snafu(display("Error converting field_name to utf8: {}", source))]
|
||||
ConvertingFieldName { source: std::string::FromUtf8Error },
|
||||
|
||||
#[snafu(display("Internal error creating CASE from tag_ref '{}: {}", tag_name, source))]
|
||||
InternalCaseConversion {
|
||||
tag_name: String,
|
||||
source: DataFusionError,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -293,6 +301,7 @@ fn convert_simple_node(
|
|||
mut builder: InfluxRpcPredicateBuilder,
|
||||
node: RPCNode,
|
||||
) -> Result<InfluxRpcPredicateBuilder> {
|
||||
// Attempt to identify OR lists
|
||||
if let Ok(in_list) = InList::try_from(&node) {
|
||||
let InList { lhs, value_list } = in_list;
|
||||
|
||||
|
@ -333,7 +342,7 @@ fn flatten_ands(node: RPCNode, mut dst: Vec<RPCNode>) -> Result<Vec<RPCNode>> {
|
|||
|
||||
// Represents a predicate like <expr> IN (option1, option2, option3, ....)
|
||||
//
|
||||
// use `try_from_node1 to convert a tree like as ((expr = option1) OR (expr =
|
||||
// use `try_from_node` to convert a tree like as ((expr = option1) OR (expr =
|
||||
// option2)) or (expr = option3)) ... into such a form
|
||||
#[derive(Debug)]
|
||||
struct InList {
|
||||
|
@ -493,13 +502,38 @@ fn build_node(value: RPCValue, inputs: Vec<Expr>) -> Result<Expr> {
|
|||
RPCValue::UintValue(v) => Ok(lit(v)),
|
||||
RPCValue::FloatValue(f) => Ok(lit(f)),
|
||||
RPCValue::RegexValue(pattern) => Ok(lit(pattern)),
|
||||
RPCValue::TagRefValue(tag_name) => Ok(col(&make_tag_name(tag_name)?)),
|
||||
RPCValue::TagRefValue(tag_name) => build_tag_ref(tag_name),
|
||||
RPCValue::FieldRefValue(field_name) => Ok(col(&field_name)),
|
||||
RPCValue::Logical(logical) => build_logical_node(logical, inputs),
|
||||
RPCValue::Comparison(comparison) => build_comparison_node(comparison, inputs),
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts InfluxRPC nodes like `TagRef(tag_name)`:
|
||||
///
|
||||
/// Special tags (_measurement, _field) -> reference to those names
|
||||
///
|
||||
/// Other tags
|
||||
///
|
||||
/// ```sql
|
||||
/// CASE
|
||||
/// WHEN tag_name IS NULL THEN ''
|
||||
/// ELSE tag_name
|
||||
/// ```
|
||||
///
|
||||
/// As storage predicates such as `TagRef(tag_name) = ''` expect to
|
||||
/// match missing tags which IOx stores as NULL
|
||||
fn build_tag_ref(tag_name: Vec<u8>) -> Result<Expr> {
|
||||
let tag_name = make_tag_name(tag_name)?;
|
||||
|
||||
match tag_name.as_str() {
|
||||
MEASUREMENT_COLUMN_NAME | FIELD_COLUMN_NAME => Ok(col(&tag_name)),
|
||||
_ => when(col(&tag_name).is_null(), lit(""))
|
||||
.otherwise(col(&tag_name))
|
||||
.context(InternalCaseConversionSnafu { tag_name }),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates an expr from a "Logical" Node
|
||||
fn build_logical_node(logical: i32, inputs: Vec<Expr>) -> Result<Expr> {
|
||||
let logical_enum = RPCLogical::from_i32(logical);
|
||||
|
|
|
@ -340,8 +340,6 @@ pub async fn read_filter_regex_operator() {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
// fixed in https://github.com/influxdata/influxdb_iox/pull/3557
|
||||
#[ignore]
|
||||
pub async fn read_filter_empty_tag_eq() {
|
||||
do_read_filter_test(
|
||||
vec!["cpu value=1 1000", "cpu,host=server01 value=2 2000"],
|
||||
|
@ -361,8 +359,6 @@ pub async fn read_filter_empty_tag_eq() {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
// fixed in https://github.com/influxdata/influxdb_iox/pull/3557
|
||||
#[ignore]
|
||||
pub async fn read_filter_empty_tag_not_regex() {
|
||||
do_read_filter_test(
|
||||
vec!["cpu value=1 1000", "cpu,host=server01 value=2 2000"],
|
||||
|
@ -382,8 +378,6 @@ pub async fn read_filter_empty_tag_not_regex() {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
// fixed in https://github.com/influxdata/influxdb_iox/pull/3557
|
||||
#[ignore]
|
||||
pub async fn read_filter_empty_tag_regex() {
|
||||
do_read_filter_test(
|
||||
vec!["cpu value=1 1000", "cpu,host=server01 value=2 2000"],
|
||||
|
|
|
@ -31,11 +31,30 @@ use datafusion::{
|
|||
/// ELSE tag_col = 'cpu'
|
||||
/// END
|
||||
/// ```
|
||||
|
||||
pub fn rewrite(expr: Expr) -> Result<Expr> {
|
||||
expr.rewrite(&mut IOxExprRewriter::new())
|
||||
}
|
||||
|
||||
/// Special purpose `Expr` rewrite rules for an Expr that is used as a predcate.
|
||||
///
|
||||
/// In general the rewrite rules in Datafusion and IOx attempt to
|
||||
/// preserve the sematics of an expression, especially with respect to
|
||||
/// nulls. This means that certain expressions can not be simplified
|
||||
/// (as they may become null)
|
||||
///
|
||||
/// However, for `Expr`s used as filters, only rows for which the
|
||||
/// `Expr` evaluates to 'true' are returned. Those rows for which the
|
||||
/// `Expr` evaluates to `false` OR `null` are filtered out.
|
||||
///
|
||||
/// This function simplifies `Expr`s that are being used as
|
||||
/// predicates.
|
||||
///
|
||||
/// Currently it is special cases, but it would be great to generalize
|
||||
/// it and contribute it back to DataFusion
|
||||
pub fn simplify_predicate(expr: Expr) -> Result<Expr> {
|
||||
expr.rewrite(&mut IOxPredicateRewriter::new())
|
||||
}
|
||||
|
||||
/// see docs on [rewrite]
|
||||
struct IOxExprRewriter {}
|
||||
|
||||
|
@ -145,6 +164,110 @@ fn inline_case(case_on_left: bool, left: Expr, right: Expr, op: Operator) -> Exp
|
|||
}
|
||||
}
|
||||
|
||||
/// see docs on [simplify_predicate]
|
||||
struct IOxPredicateRewriter {}
|
||||
|
||||
impl IOxPredicateRewriter {
|
||||
fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
/// returns the column name for a column expression
|
||||
fn is_col(expr: &Expr) -> Option<&str> {
|
||||
if let Expr::Column(c) = &expr {
|
||||
Some(c.name.as_str())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// returns the column name for an expression like `IS NULL(col)`
|
||||
fn is_col_null(expr: &Expr) -> Option<&str> {
|
||||
if let Expr::IsNull(arg) = &expr {
|
||||
is_col(arg)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// returns the column name for an expression like `IS NOT NULL(col)` or `NOT(IS NULL(col))`
|
||||
fn is_col_not_null(expr: &Expr) -> Option<&str> {
|
||||
match expr {
|
||||
Expr::IsNotNull(arg) => is_col(arg),
|
||||
Expr::Not(arg) => is_col_null(arg),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_lit(expr: &Expr) -> bool {
|
||||
matches!(expr, Expr::Literal(_))
|
||||
}
|
||||
|
||||
/// returns the column name for an expression like `col = <lit>`
|
||||
fn is_col_op_lit(expr: &Expr) -> Option<&str> {
|
||||
match expr {
|
||||
Expr::BinaryExpr { left, op: _, right } if is_lit(right) => is_col(left),
|
||||
Expr::BinaryExpr { left, op: _, right } if is_lit(left) => is_col(right),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
impl ExprRewriter for IOxPredicateRewriter {
|
||||
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
|
||||
// look for this structure:
|
||||
//
|
||||
// NOT(col IS NULL) AND col = 'foo'
|
||||
//
|
||||
// and replace it with
|
||||
//
|
||||
// col = 'foo'
|
||||
//
|
||||
// Proof:
|
||||
// Case 1: col is NULL
|
||||
//
|
||||
// not (NULL IS NULL) AND col = 'foo'
|
||||
// not (true) AND NULL = 'foo'
|
||||
// NULL
|
||||
//
|
||||
// Case 2: col is not NULL and not equal to 'foo'
|
||||
// not (false) AND false
|
||||
// true AND false
|
||||
// false
|
||||
//
|
||||
// Case 3: col is not NULL and equal to 'foo'
|
||||
// not (false) AND true
|
||||
// true AND true
|
||||
// true
|
||||
match expr {
|
||||
Expr::BinaryExpr {
|
||||
left,
|
||||
op: Operator::And,
|
||||
right,
|
||||
} => {
|
||||
if let (Some(coll), Some(colr)) = (is_col_not_null(&left), is_col_op_lit(&right)) {
|
||||
if colr == coll {
|
||||
return Ok(*right);
|
||||
}
|
||||
} else if let (Some(coll), Some(colr)) =
|
||||
(is_col_op_lit(&left), is_col_not_null(&right))
|
||||
{
|
||||
if colr == coll {
|
||||
return Ok(*left);
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Expr::BinaryExpr {
|
||||
left,
|
||||
op: Operator::And,
|
||||
right,
|
||||
})
|
||||
}
|
||||
expr => Ok(expr),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::ops::Add;
|
||||
|
@ -332,4 +455,49 @@ mod tests {
|
|||
.otherwise(otherwise_expr)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simplify_predicate() {
|
||||
let expr = col("foo").is_null().not().and(col("foo").eq(lit("bar")));
|
||||
let expected = col("foo").eq(lit("bar"));
|
||||
assert_eq!(expected, simplify_predicate(expr).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simplify_predicate_reversed() {
|
||||
let expr = col("foo").eq(lit("bar")).and(col("foo").is_null().not());
|
||||
let expected = col("foo").eq(lit("bar"));
|
||||
assert_eq!(expected, simplify_predicate(expr).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simplify_predicate_different_col() {
|
||||
// only works when col references are the same
|
||||
let expr = col("foo").is_null().not().and(col("foo2").eq(lit("bar")));
|
||||
let expected = expr.clone();
|
||||
assert_eq!(expected, simplify_predicate(expr).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simplify_predicate_different_col_reversed() {
|
||||
// only works when col references are the same
|
||||
let expr = col("foo2").eq(lit("bar")).and(col("foo").is_null().not());
|
||||
let expected = expr.clone();
|
||||
assert_eq!(expected, simplify_predicate(expr).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simplify_predicate_is_not_null() {
|
||||
let expr = col("foo").is_not_null().and(col("foo").eq(lit("bar")));
|
||||
let expected = col("foo").eq(lit("bar"));
|
||||
assert_eq!(expected, simplify_predicate(expr).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simplify_predicate_complex() {
|
||||
// can't rewrite to some thing else fancy on the right
|
||||
let expr = col("foo").is_null().not().and(col("foo").eq(col("foo")));
|
||||
let expected = expr.clone();
|
||||
assert_eq!(expected, simplify_predicate(expr).unwrap());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
//! Interface logic between IOx ['Predicate`] and predicates used by the
|
||||
//! InfluxDB Storage gRPC API
|
||||
use crate::predicate::{BinaryExpr, Predicate};
|
||||
use crate::rewrite;
|
||||
|
||||
use datafusion::error::Result as DataFusionResult;
|
||||
use datafusion::execution::context::ExecutionProps;
|
||||
|
@ -172,14 +173,23 @@ fn normalize_predicate(
|
|||
// column projection set.
|
||||
rewrite_field_column_references(&mut field_projections, e)
|
||||
})
|
||||
.map(|e| {
|
||||
// apply IOx specific rewrites (that unlock other simplifications)
|
||||
rewrite::rewrite(e).expect("rewrite failed")
|
||||
})
|
||||
.map(|e| {
|
||||
if let Some(schema) = &schema {
|
||||
e.simplify(&SimplifyAdapter::new(schema.as_ref()))
|
||||
.expect("Expression simplificiation failed")
|
||||
let adapter = SimplifyAdapter::new(schema.as_ref());
|
||||
// simplify twice to ensure "full" cleanup
|
||||
e.simplify(&adapter)
|
||||
.expect("Expression simplificiation round 1 failed")
|
||||
.simplify(&adapter)
|
||||
.expect("Expression simplificiation round 2 failed")
|
||||
} else {
|
||||
e
|
||||
}
|
||||
})
|
||||
.map(|e| rewrite::simplify_predicate(e).expect("simplify failed"))
|
||||
.collect::<Vec<_>>();
|
||||
// Store any field value (`_value`) expressions on the `Predicate`.
|
||||
predicate.value_expr = field_value_exprs;
|
||||
|
|
|
@ -9,10 +9,9 @@ use data_types::chunk_metadata::ChunkId;
|
|||
use datafusion::{
|
||||
error::{DataFusionError, Result as DatafusionResult},
|
||||
logical_plan::{
|
||||
binary_expr, lit, when, DFSchema, DFSchemaRef, Expr, ExprRewriter, LogicalPlan,
|
||||
binary_expr, col, lit, when, DFSchema, DFSchemaRef, Expr, ExprRewriter, LogicalPlan,
|
||||
LogicalPlanBuilder,
|
||||
},
|
||||
prelude::col,
|
||||
scalar::ScalarValue,
|
||||
};
|
||||
use datafusion_util::AsExpr;
|
||||
|
@ -2039,9 +2038,19 @@ mod tests {
|
|||
.with_time_column(),
|
||||
);
|
||||
|
||||
// this is what happens with a grpc predicate on a tag
|
||||
//
|
||||
// tag(foo) = 'bar' becomes
|
||||
//
|
||||
// CASE WHEN foo IS NULL then '' ELSE foo END = 'bar'
|
||||
//
|
||||
// It is critical to be rewritten foo = 'bar' correctly so
|
||||
// that it can be evaluated quickly
|
||||
let expr = when(col("foo").is_null(), lit(""))
|
||||
.otherwise(col("foo"))
|
||||
.unwrap();
|
||||
let silly_predicate = PredicateBuilder::new()
|
||||
// (foo = 'bar') OR false
|
||||
.add_expr(col("foo").eq(lit("bar")).or(lit(false)))
|
||||
.add_expr(expr.eq(lit("bar")))
|
||||
.build();
|
||||
|
||||
let executor = Arc::new(Executor::new(1));
|
||||
|
@ -2055,11 +2064,10 @@ mod tests {
|
|||
|
||||
let actual_predicate = test_db.get_chunks_predicate();
|
||||
|
||||
// verify that the predicate was rewritten to foo = 'bar'
|
||||
let expected_predicate = PredicateBuilder::new()
|
||||
// (foo = 'bar') OR false
|
||||
.add_expr(col("foo").eq(lit("bar")))
|
||||
.build();
|
||||
// verify that the predicate was rewritten to `foo = 'bar'`
|
||||
let expr = col("foo").eq(lit("bar"));
|
||||
|
||||
let expected_predicate = PredicateBuilder::new().add_expr(expr).build();
|
||||
|
||||
assert_eq!(
|
||||
actual_predicate, expected_predicate,
|
||||
|
|
|
@ -30,7 +30,8 @@ pub trait PruningObserver {
|
|||
}
|
||||
|
||||
/// Given a Vec of prunable items, returns a possibly smaller set
|
||||
/// filtering those that can not pass the predicate.
|
||||
/// filtering those where the predicate can be proven to evaluate to
|
||||
/// `false` for every single row.
|
||||
///
|
||||
/// TODO(raphael): Perhaps this should return `Result<Vec<bool>>` instead of
|
||||
/// the [`PruningObserver`] plumbing
|
||||
|
@ -626,6 +627,67 @@ mod test {
|
|||
assert_eq!(names(&pruned), vec!["chunk2", "chunk3"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pruned_is_null() {
|
||||
test_helpers::maybe_start_logging();
|
||||
// Verify that type of predicate is pruned if column1 is null
|
||||
// (this is a common predicate type created by the INfluxRPC planner)
|
||||
// (NOT column1 IS NULL) AND (column1 = 'bar')
|
||||
let observer = TestObserver::new();
|
||||
// No nulls, can't prune as it has values that are more and less than 'bar'
|
||||
let c1 = Arc::new(
|
||||
TestChunk::new("chunk1").with_tag_column_with_nulls_and_full_stats(
|
||||
"column1",
|
||||
Some("a"),
|
||||
Some("z"),
|
||||
100,
|
||||
None,
|
||||
0,
|
||||
),
|
||||
);
|
||||
|
||||
// Has no nulls, can prune it out based on statistics alone
|
||||
let c2 = Arc::new(
|
||||
TestChunk::new("chunk2").with_tag_column_with_nulls_and_full_stats(
|
||||
"column1",
|
||||
Some("a"),
|
||||
Some("b"),
|
||||
100,
|
||||
None,
|
||||
0,
|
||||
),
|
||||
);
|
||||
|
||||
// Has nulls, can still can prune it out based on statistics alone
|
||||
let c3 = Arc::new(
|
||||
TestChunk::new("chunk3").with_tag_column_with_nulls_and_full_stats(
|
||||
"column1",
|
||||
Some("a"),
|
||||
Some("b"),
|
||||
100,
|
||||
None,
|
||||
1, // that one peksy null!
|
||||
),
|
||||
);
|
||||
|
||||
let predicate = PredicateBuilder::new()
|
||||
.add_expr(
|
||||
col("column1")
|
||||
.is_null()
|
||||
.not()
|
||||
.and(col("column1").eq(lit("bar"))),
|
||||
)
|
||||
.build();
|
||||
|
||||
let chunks = vec![c1, c2, c3];
|
||||
let schema = merge_schema(&chunks);
|
||||
|
||||
let pruned = prune_chunks(&observer, schema, chunks, &predicate);
|
||||
|
||||
assert_eq!(observer.events(), vec!["chunk2: Pruned", "chunk3: Pruned"]);
|
||||
assert_eq!(names(&pruned), vec!["chunk1"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pruned_multi_column() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
|
|
@ -376,8 +376,28 @@ impl TestChunk {
|
|||
count: u64,
|
||||
distinct_count: Option<NonZeroU64>,
|
||||
) -> Self {
|
||||
let column_name = column_name.into();
|
||||
let null_count = 0;
|
||||
self.with_tag_column_with_nulls_and_full_stats(
|
||||
column_name,
|
||||
min,
|
||||
max,
|
||||
count,
|
||||
distinct_count,
|
||||
null_count,
|
||||
)
|
||||
}
|
||||
|
||||
/// Register a tag column with stats with the test chunk
|
||||
pub fn with_tag_column_with_nulls_and_full_stats(
|
||||
self,
|
||||
column_name: impl Into<String>,
|
||||
min: Option<&str>,
|
||||
max: Option<&str>,
|
||||
count: u64,
|
||||
distinct_count: Option<NonZeroU64>,
|
||||
null_count: u64,
|
||||
) -> Self {
|
||||
let column_name = column_name.into();
|
||||
|
||||
// make a new schema with the specified column and
|
||||
// merge it in to any existing schema
|
||||
|
|
Loading…
Reference in New Issue