feat: rewrite missing column references to NULL (#5818)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-10-07 14:05:54 -04:00 committed by GitHub
parent 02e3ab125c
commit 8013781ac2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 172 additions and 14 deletions

View File

@ -12,7 +12,6 @@
pub mod delete_expr;
pub mod delete_predicate;
pub mod rewrite;
pub mod rpc_predicate;
use arrow::{

View File

@ -1,19 +1,23 @@
mod column_rewrite;
mod field_rewrite;
mod measurement_rewrite;
mod rewrite;
mod value_rewrite;
use crate::{rewrite, Predicate};
use crate::Predicate;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_expr::lit;
use datafusion::logical_plan::{
Column, Expr, ExprSchema, ExprSchemable, ExprSimplifiable, SimplifyInfo,
Column, Expr, ExprRewritable, ExprSchema, ExprSchemable, ExprSimplifiable, SimplifyInfo,
};
use observability_deps::tracing::{debug, trace};
use schema::Schema;
use std::collections::BTreeSet;
use std::sync::Arc;
use self::column_rewrite::MissingColumnRewriter;
use self::field_rewrite::FieldProjectionRewriter;
use self::measurement_rewrite::rewrite_measurement_references;
use self::value_rewrite::rewrite_field_value_references;
@ -187,6 +191,7 @@ fn normalize_predicate(
let mut predicate = predicate.clone();
let mut field_projections = FieldProjectionRewriter::new(Arc::clone(&schema));
let mut missing_columums = MissingColumnRewriter::new(Arc::clone(&schema));
let mut field_value_exprs = vec![];
@ -194,24 +199,38 @@ fn normalize_predicate(
.exprs
.into_iter()
.map(|e| {
rewrite_measurement_references(table_name, e)
debug!(?e, "rewriting expr");
let e = rewrite_measurement_references(table_name, e)
.map(|e| log_rewrite(e, "rewrite_measurement_references"))
// Rewrite any references to `_value = some_value` to literal true values.
// Keeps track of these expressions, which can then be used to
// augment field projections with conditions using `CASE` statements.
.and_then(|e| rewrite_field_value_references(&mut field_value_exprs, e))
.map(|e| log_rewrite(e, "rewrite_field_value_references"))
// Rewrite any references to `_field` with a literal
// and keep track of referenced field names to add to
// the field column projection set.
.and_then(|e| field_projections.rewrite_field_exprs(e))
.map(|e| log_rewrite(e, "field_projections"))
// remove references to columns that don't exist in this schema
.and_then(|e| e.rewrite(&mut missing_columums))
.map(|e| log_rewrite(e, "missing_columums"))
// apply IOx specific rewrites (that unlock other simplifications)
.and_then(rewrite::rewrite)
// Call the core DataFusion simplification logic
.map(|e| log_rewrite(e, "rewrite"))
// Call DataFusion simplification logic
.and_then(|e| {
let adapter = SimplifyAdapter::new(schema.as_ref());
// simplify twice to ensure "full" cleanup
e.simplify(&adapter)?.simplify(&adapter)
})
.map(|e| log_rewrite(e, "simplify_expr"))
.and_then(rewrite::simplify_predicate)
.map(|e| log_rewrite(e, "simplify_expr"));
debug!(?e, "rewritten expr");
e
})
// Filter out literal true so is_empty works correctly
.filter(|f| match f {
@ -227,6 +246,11 @@ fn normalize_predicate(
field_projections.add_to_predicate(predicate)
}
fn log_rewrite(expr: Expr, description: &str) -> Expr {
trace!(?expr, %description, "After rewrite");
expr
}
struct SimplifyAdapter<'a> {
schema: &'a Schema,
execution_props: ExecutionProps,
@ -290,9 +314,27 @@ mod tests {
use super::*;
use arrow::datatypes::DataType;
use datafusion::logical_plan::{col, lit};
use datafusion::{
logical_plan::{col, lit},
scalar::ScalarValue,
};
use test_helpers::assert_contains;
#[test]
fn test_normalize_predicate_coerced() {
let schema = schema();
let predicate = normalize_predicate(
"table",
Arc::clone(&schema),
&Predicate::new().with_expr(col("t1").eq(lit("f1"))),
)
.unwrap();
let expected = Predicate::new().with_expr(col("t1").eq(lit("f1")));
assert_eq!(predicate, expected);
}
#[test]
fn test_normalize_predicate_field_rewrite() {
let predicate = normalize_predicate(
@ -336,6 +378,20 @@ mod tests {
assert_eq!(predicate, expected);
}
#[test]
fn test_normalize_predicate_field_non_tag() {
// should treat
let predicate = normalize_predicate(
"table",
schema(),
&Predicate::new().with_expr(col("not_a_tag").eq(lit("blarg"))),
)
.unwrap();
let expected = Predicate::new().with_expr(lit(ScalarValue::Boolean(None)));
assert_eq!(predicate, expected);
}
#[test]
fn test_normalize_predicate_field_rewrite_multi_field_unsupported() {
let err = normalize_predicate(

View File

@ -0,0 +1,99 @@
use std::sync::Arc;
use datafusion::{
error::Result as DataFusionResult, logical_plan::ExprRewriter, prelude::*, scalar::ScalarValue,
};
use schema::Schema;
/// Logic for rewriting expressions from influxrpc that reference non
/// existent columns to NULL
#[derive(Debug)]
pub(crate) struct MissingColumnRewriter {
/// The input schema
schema: Arc<Schema>,
}
impl MissingColumnRewriter {
/// Create a new [`MissingColumnRewriter`] targeting the given schema
pub(crate) fn new(schema: Arc<Schema>) -> Self {
Self { schema }
}
fn column_exists(&self, col: &Column) -> DataFusionResult<bool> {
// todo a real error here (rpc_predicates shouldn't have table/relation qualifiers)
assert!(col.relation.is_none());
if self.schema.find_index_of(&col.name).is_some() {
Ok(true)
} else {
Ok(false)
}
}
}
fn lit_null() -> Expr {
lit(ScalarValue::Utf8(None))
}
impl ExprRewriter for MissingColumnRewriter {
fn mutate(&mut self, expr: Expr) -> DataFusionResult<Expr> {
Ok(match expr {
Expr::Column(col) if !self.column_exists(&col)? => lit_null(),
expr => expr,
})
}
}
#[cfg(test)]
mod tests {
use datafusion::{arrow::datatypes::DataType, logical_plan::ExprRewritable};
use schema::SchemaBuilder;
use super::*;
#[test]
fn all_columns_defined_no_rewrite() {
// t1 = "foo"
let expr = col("t1").eq(lit("foo"));
assert_eq!(rewrite(expr.clone()), expr);
// f1 > 1.0
let expr = col("f1").gt(lit(1.0));
assert_eq!(rewrite(expr.clone()), expr);
}
#[test]
fn all_columns_not_defined() {
// non_defined = "foo" --> NULL = "foo"
let expr = col("non_defined").eq(lit("foo"));
let expected = lit_null().eq(lit("foo"));
assert_eq!(rewrite(expr), expected);
// non_defined = 1.4 --> NULL = 1.4
let expr = col("non_defined").eq(lit(1.4));
// No type is inferred so this is a literal null string (even though it maybe should be a literal float)
let expected = lit_null().eq(lit(1.4));
assert_eq!(rewrite(expr), expected);
}
#[test]
fn some_columns_not_defined() {
// t1 = "foo" AND non_defined = "bar" --> t1 = "foo" and NULL = "bar"
let expr = col("t1")
.eq(lit("foo"))
.and(col("non_defined").eq(lit("bar")));
let expected = col("t1").eq(lit("foo")).and(lit_null().eq(lit("bar")));
assert_eq!(rewrite(expr), expected);
}
fn rewrite(expr: Expr) -> Expr {
let schema = SchemaBuilder::new()
.tag("t1")
.field("f1", DataType::Int64)
.build()
.unwrap();
let mut rewriter = MissingColumnRewriter::new(Arc::new(schema));
expr.rewrite(&mut rewriter).unwrap()
}
}

View File

@ -55,8 +55,8 @@ impl FieldProjectionRewriter {
}
}
// Rewrites the predicate. See the description on
// [`FieldProjectionRewriter`] for more details.
/// Rewrites the predicate. See the description on
/// [`FieldProjectionRewriter`] for more details.
pub(crate) fn rewrite_field_exprs(&mut self, expr: Expr) -> DataFusionResult<Expr> {
// for predicates like `A AND B AND C`
// rewrite `A`, `B` and `C` separately and put them back together

View File

@ -205,12 +205,12 @@ async fn test_read_filter_invalid_predicate_case() {
#[tokio::test]
async fn test_read_filter_unknown_column_in_predicate() {
let predicate = Predicate::new()
// mystery_region is not a real column, so this predicate is
// mystery_region and bar are not real columns, so this predicate is
// invalid but IOx should be able to handle it (and produce no results)
.with_expr(
col("baz")
.eq(lit(4i32))
.or(col("bar").and(col("mystery_region").gt(lit(5i32)))),
col("baz").eq(lit(4i32)).or(col("bar")
.eq(lit("baz"))
.and(col("mystery_region").gt(lit(5i32)))),
);
let predicate = InfluxRpcPredicate::new(None, predicate);

View File

@ -906,6 +906,7 @@ mod tests {
let schema = SchemaBuilder::new()
.tag("t1")
.tag("t2")
.tag("host")
.field("foo", DataType::Int64)
.field("bar", DataType::Int64)
.build()

View File

@ -1802,11 +1802,13 @@ mod tests {
// Note multiple tables / measureemnts:
let chunk0 = TestChunk::new("m1")
.with_id(0)
.with_tag_column("state")
.with_tag_column("k1")
.with_tag_column("k2");
let chunk1 = TestChunk::new("m2")
.with_id(1)
.with_tag_column("state")
.with_tag_column("k3")
.with_tag_column("k4");
@ -1826,7 +1828,7 @@ mod tests {
};
let actual_tag_keys = fixture.storage_client.tag_keys(request).await.unwrap();
let expected_tag_keys = vec!["_f(0xff)", "_m(0x00)", "k1", "k2", "k3", "k4"];
let expected_tag_keys = vec!["_f(0xff)", "_m(0x00)", "k1", "k2", "k3", "k4", "state"];
assert_eq!(actual_tag_keys, expected_tag_keys,);
@ -1898,6 +1900,7 @@ mod tests {
.with_tag_column("k0");
let chunk1 = TestChunk::new("m4")
.with_tag_column("state")
.with_tag_column("k1")
.with_tag_column("k2")
.with_tag_column("k3")
@ -1927,7 +1930,7 @@ mod tests {
.measurement_tag_keys(request)
.await
.unwrap();
let expected_tag_keys = vec!["_f(0xff)", "_m(0x00)", "k1", "k2", "k3", "k4"];
let expected_tag_keys = vec!["_f(0xff)", "_m(0x00)", "k1", "k2", "k3", "k4", "state"];
assert_eq!(
actual_tag_keys, expected_tag_keys,