fix: Use `as_expr` vs `col` to avoid splitting identifiers with periods (#7011)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Stuart Carnie 2023-02-16 22:03:06 +11:00 committed by GitHub
parent b11228c72e
commit b840ed0ad9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 56 additions and 2 deletions

View File

@ -2467,6 +2467,60 @@ mod tests {
.await
}
/// Fix to address [IDPE issue #17144][17144]
///
/// [17144]: https://github.com/influxdata/idpe/issues/17144
#[tokio::test]
async fn test_idpe_issue_17144() {
let chunk0 = Arc::new(
TestChunk::new("h2o")
.with_id(0)
.with_tag_column("foo")
.with_f64_field_column("foo.bar") // with period
.with_time_column(),
);
let executor = Arc::new(Executor::new_testing());
let test_db = Arc::new(TestDatabase::new(Arc::clone(&executor)));
test_db.add_chunk("my_partition_key", Arc::clone(&chunk0));
// verify that _field = 'foo.bar' is rewritten correctly
let predicate = Predicate::new().with_expr(
"_field"
.as_expr()
.eq(lit("foo.bar"))
.and("_value".as_expr().eq(lit(1.2))),
);
let rpc_predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::None;
let group_columns = &["foo"];
let res = InfluxRpcPlanner::new(IOxSessionContext::with_testing())
.read_group(Arc::clone(&test_db) as _, rpc_predicate, agg, group_columns)
.await
.expect("creating plan");
assert_eq!(res.plans.len(), 1);
let ssplan = res.plans.first().unwrap();
insta::assert_snapshot!(ssplan.plan.display_indent_schema().to_string(), @r###"
Projection: h2o.foo, CASE WHEN h2o.foo.bar = Float64(1.2) THEN h2o.foo.bar END AS foo.bar, h2o.time [foo:Dictionary(Int32, Utf8);N, foo.bar:Float64;N, time:Timestamp(Nanosecond, None)]
Sort: h2o.foo ASC NULLS FIRST, h2o.time ASC NULLS FIRST [foo:Dictionary(Int32, Utf8);N, foo.bar:Float64;N, time:Timestamp(Nanosecond, None)]
TableScan: h2o [foo:Dictionary(Int32, Utf8);N, foo.bar:Float64;N, time:Timestamp(Nanosecond, None)]
"###);
let got_predicate = test_db.get_chunks_predicate();
let exp_predicate = Predicate::new()
.with_field_columns(vec!["foo.bar"])
.with_value_expr(
"_value"
.as_expr()
.eq(lit(1.2))
.try_into()
.expect("failed to convert _value expression"),
);
assert_eq!(got_predicate, exp_predicate);
}
#[tokio::test]
async fn test_predicate_read_window_aggregate() {
run_test(|test_db, rpc_predicate| {

View File

@ -33,7 +33,7 @@ use datafusion::{
physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
prelude::{col, lit_timestamp_nano, Expr},
};
use datafusion_util::{make_range_expr, nullable_schema};
use datafusion_util::{make_range_expr, nullable_schema, AsExpr};
use observability_deps::tracing::debug;
use rpc_predicate::VALUE_COLUMN_NAME;
use schema::TIME_COLUMN_NAME;
@ -585,7 +585,7 @@ impl ValueExpr {
/// column replaced with the specified column name
pub fn replace_col(&self, name: &str) -> Expr {
if let Expr::BinaryExpr(BinaryExpr { left: _, op, right }) = &self.expr {
binary_expr(col(name), *op, right.as_ref().clone())
binary_expr(name.as_expr(), *op, right.as_ref().clone())
} else {
unreachable!("Unexpected content in ValueExpr")
}