From b840ed0ad9ebe00efba3dc5efec36c83f0e8f4f4 Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Thu, 16 Feb 2023 22:03:06 +1100 Subject: [PATCH] fix: Use `as_expr` vs `col` to avoid splitting identifiers with periods (#7011) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- iox_query/src/frontend/influxrpc.rs | 54 +++++++++++++++++++++++++++++ predicate/src/lib.rs | 4 +-- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/iox_query/src/frontend/influxrpc.rs b/iox_query/src/frontend/influxrpc.rs index 901805c803..e7d32d01ce 100644 --- a/iox_query/src/frontend/influxrpc.rs +++ b/iox_query/src/frontend/influxrpc.rs @@ -2467,6 +2467,60 @@ mod tests { .await } + /// Fix to address [IDPE issue #17144][17144] + /// + /// [17144]: https://github.com/influxdata/idpe/issues/17144 + #[tokio::test] + async fn test_idpe_issue_17144() { + let chunk0 = Arc::new( + TestChunk::new("h2o") + .with_id(0) + .with_tag_column("foo") + .with_f64_field_column("foo.bar") // with period + .with_time_column(), + ); + + let executor = Arc::new(Executor::new_testing()); + let test_db = Arc::new(TestDatabase::new(Arc::clone(&executor))); + test_db.add_chunk("my_partition_key", Arc::clone(&chunk0)); + + // verify that _field = 'foo.bar' is rewritten correctly + let predicate = Predicate::new().with_expr( + "_field" + .as_expr() + .eq(lit("foo.bar")) + .and("_value".as_expr().eq(lit(1.2))), + ); + + let rpc_predicate = InfluxRpcPredicate::new(None, predicate); + + let agg = Aggregate::None; + let group_columns = &["foo"]; + let res = InfluxRpcPlanner::new(IOxSessionContext::with_testing()) + .read_group(Arc::clone(&test_db) as _, rpc_predicate, agg, group_columns) + .await + .expect("creating plan"); + assert_eq!(res.plans.len(), 1); + let ssplan = res.plans.first().unwrap(); + insta::assert_snapshot!(ssplan.plan.display_indent_schema().to_string(), @r###" + Projection: h2o.foo, CASE WHEN h2o.foo.bar = Float64(1.2) THEN h2o.foo.bar END AS foo.bar, h2o.time [foo:Dictionary(Int32, Utf8);N, foo.bar:Float64;N, time:Timestamp(Nanosecond, None)] + Sort: h2o.foo ASC NULLS FIRST, h2o.time ASC NULLS FIRST [foo:Dictionary(Int32, Utf8);N, foo.bar:Float64;N, time:Timestamp(Nanosecond, None)] + TableScan: h2o [foo:Dictionary(Int32, Utf8);N, foo.bar:Float64;N, time:Timestamp(Nanosecond, None)] + "###); + + let got_predicate = test_db.get_chunks_predicate(); + let exp_predicate = Predicate::new() + .with_field_columns(vec!["foo.bar"]) + .with_value_expr( + "_value" + .as_expr() + .eq(lit(1.2)) + .try_into() + .expect("failed to convert _value expression"), + ); + assert_eq!(got_predicate, exp_predicate); + } + #[tokio::test] async fn test_predicate_read_window_aggregate() { run_test(|test_db, rpc_predicate| { diff --git a/predicate/src/lib.rs b/predicate/src/lib.rs index cccad5672d..d68208d059 100644 --- a/predicate/src/lib.rs +++ b/predicate/src/lib.rs @@ -33,7 +33,7 @@ use datafusion::{ physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, prelude::{col, lit_timestamp_nano, Expr}, }; -use datafusion_util::{make_range_expr, nullable_schema}; +use datafusion_util::{make_range_expr, nullable_schema, AsExpr}; use observability_deps::tracing::debug; use rpc_predicate::VALUE_COLUMN_NAME; use schema::TIME_COLUMN_NAME; @@ -585,7 +585,7 @@ impl ValueExpr { /// column replaced with the specified column name pub fn replace_col(&self, name: &str) -> Expr { if let Expr::BinaryExpr(BinaryExpr { left: _, op, right }) = &self.expr { - binary_expr(col(name), *op, right.as_ref().clone()) + binary_expr(name.as_expr(), *op, right.as_ref().clone()) } else { unreachable!("Unexpected content in ValueExpr") }