From f34282be2ca6dc51506d53e94302cf8e8f30963e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 8 Jun 2022 17:01:22 -0400 Subject: [PATCH] fix: Do not run DataFusion optimizer pass twice (#4809) * fix: Do not run DataFusion optimizer pass twice * docs: improve docstring and logging --- compactor/src/compact.rs | 2 +- ingester/src/compact.rs | 2 +- ingester/src/querier_handler.rs | 2 +- iox_query/src/exec/context.rs | 30 +++++++--------- iox_query/src/frontend.rs | 2 +- iox_query/src/frontend/reorg.rs | 6 ++-- query_tests/src/influxrpc/read_filter.rs | 46 +++++++++++++++++++----- 7 files changed, 56 insertions(+), 34 deletions(-) diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 949722354c..47da302d96 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -744,7 +744,7 @@ impl Compactor { let ctx = self.exec.new_context(ExecutorType::Reorg); let physical_plan = ctx - .prepare_plan(&plan) + .create_physical_plan(&plan) .await .context(CompactPhysicalPlanSnafu)?; diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index 2b26d97989..8c8df8171e 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -128,7 +128,7 @@ pub async fn compact( // Build physical plan let physical_plan = ctx - .prepare_plan(&logical_plan) + .create_physical_plan(&logical_plan) .await .context(PhysicalPlanSnafu {})?; diff --git a/ingester/src/querier_handler.rs b/ingester/src/querier_handler.rs index 2b62e21995..6819c5a1b2 100644 --- a/ingester/src/querier_handler.rs +++ b/ingester/src/querier_handler.rs @@ -321,7 +321,7 @@ pub async fn query( // Build physical plan let physical_plan = ctx - .prepare_plan(&logical_plan) + .create_physical_plan(&logical_plan) .await .context(PhysicalPlanSnafu {})?; diff --git a/iox_query/src/exec/context.rs b/iox_query/src/exec/context.rs index 449392ab20..5bcbe3e468 100644 --- a/iox_query/src/exec/context.rs +++ b/iox_query/src/exec/context.rs @@ -23,7 +23,7 @@ use datafusion::{ prelude::*, }; use futures::TryStreamExt; -use observability_deps::tracing::{debug, trace}; +use observability_deps::tracing::debug; use trace::{ ctx::SpanContext, span::{MetaValue, SpanRecorder}, @@ -279,23 +279,17 @@ impl IOxSessionContext { debug!(text=%sql, "planning SQL query"); let logical_plan = ctx.inner.create_logical_plan(sql)?; debug!(plan=%logical_plan.display_graphviz(), "logical plan"); - ctx.prepare_plan(&logical_plan).await + ctx.create_physical_plan(&logical_plan).await } - /// Prepare (optimize + plan) a pre-created logical plan for execution - pub async fn prepare_plan(&self, plan: &LogicalPlan) -> Result> { - let mut ctx = self.child_ctx("prepare_plan"); - debug!(text=%plan.display_indent_schema(), "prepare_plan: initial plan"); + /// Prepare (optimize + plan) a pre-created [`LogicalPlan`] for execution + pub async fn create_physical_plan(&self, plan: &LogicalPlan) -> Result> { + let mut ctx = self.child_ctx("create_physical_plan"); + debug!(text=%plan.display_indent_schema(), "create_physical_plan: initial plan"); + let physical_plan = ctx.inner.create_physical_plan(plan).await?; - let plan = ctx.inner.optimize(plan)?; - - ctx.recorder.event("optimized plan"); - trace!(text=%plan.display_indent_schema(), graphviz=%plan.display_graphviz(), "optimized plan"); - - let physical_plan = ctx.inner.create_physical_plan(&plan).await?; - - ctx.recorder.event("plan to run"); - debug!(text=%displayable(physical_plan.as_ref()).indent(), "prepare_plan: plan to run"); + ctx.recorder.event("physical plan"); + debug!(text=%displayable(physical_plan.as_ref()).indent(), "create_physical_plan: plan to run"); Ok(physical_plan) } @@ -400,7 +394,7 @@ impl IOxSessionContext { let tag_columns = Arc::new(tag_columns); - let physical_plan = ctx.prepare_plan(&plan).await?; + let physical_plan = ctx.create_physical_plan(&plan).await?; let it = ctx.execute_stream(physical_plan).await?; @@ -466,7 +460,7 @@ impl IOxSessionContext { .map(|plan| { let ctx = self.child_ctx("to_field_list"); self.run(async move { - let physical_plan = ctx.prepare_plan(&plan).await?; + let physical_plan = ctx.create_physical_plan(&plan).await?; // TODO: avoid this buffering let field_list = @@ -523,7 +517,7 @@ impl IOxSessionContext { .map(|plan| { let ctx = self.child_ctx("run_logical_plans"); self.run(async move { - let physical_plan = ctx.prepare_plan(&plan).await?; + let physical_plan = ctx.create_physical_plan(&plan).await?; // TODO: avoid this buffering ctx.collect(physical_plan).await diff --git a/iox_query/src/frontend.rs b/iox_query/src/frontend.rs index 04f333d3f7..02c9cedc4a 100644 --- a/iox_query/src/frontend.rs +++ b/iox_query/src/frontend.rs @@ -69,7 +69,7 @@ mod test { let executor = Executor::new(1); let plan = executor .new_context(ExecutorType::Reorg) - .prepare_plan(&split_plan) + .create_physical_plan(&split_plan) .await .unwrap(); diff --git a/iox_query/src/frontend/reorg.rs b/iox_query/src/frontend/reorg.rs index 130fb0f0e7..e48b851fd2 100644 --- a/iox_query/src/frontend/reorg.rs +++ b/iox_query/src/frontend/reorg.rs @@ -342,7 +342,7 @@ mod test { let executor = Executor::new(1); let physical_plan = executor .new_context(ExecutorType::Reorg) - .prepare_plan(&scan_plan) + .create_physical_plan(&scan_plan) .await .unwrap(); @@ -387,7 +387,7 @@ mod test { let executor = Executor::new(1); let physical_plan = executor .new_context(ExecutorType::Reorg) - .prepare_plan(&compact_plan) + .create_physical_plan(&compact_plan) .await .unwrap(); assert_eq!( @@ -440,7 +440,7 @@ mod test { let executor = Executor::new(1); let physical_plan = executor .new_context(ExecutorType::Reorg) - .prepare_plan(&split_plan) + .create_physical_plan(&split_plan) .await .unwrap(); diff --git a/query_tests/src/influxrpc/read_filter.rs b/query_tests/src/influxrpc/read_filter.rs index c200988362..9313e447c7 100644 --- a/query_tests/src/influxrpc/read_filter.rs +++ b/query_tests/src/influxrpc/read_filter.rs @@ -3,8 +3,8 @@ use std::sync::Arc; #[cfg(test)] use crate::scenarios::{ - DbScenario, DbSetup, TwoMeasurements, TwoMeasurementsManyFields, TwoMeasurementsWithDelete, - TwoMeasurementsWithDeleteAll, + DbScenario, DbSetup, EndToEndTest, TwoMeasurements, TwoMeasurementsManyFields, + TwoMeasurementsWithDelete, TwoMeasurementsWithDeleteAll, }; use crate::{ db::AbstractDb, @@ -15,7 +15,7 @@ use crate::{ TwoMeasurementsMultiSeriesWithDelete, TwoMeasurementsMultiSeriesWithDeleteAll, }, }; -use datafusion::logical_plan::{col, lit, when}; +use datafusion::logical_plan::{col, lit, when, Expr}; use iox_query::frontend::influxrpc::InfluxRpcPlanner; use predicate::rpc_predicate::InfluxRpcPredicate; use predicate::Predicate; @@ -187,12 +187,7 @@ async fn test_read_filter_invalid_predicate_case() { // https://github.com/influxdata/influxdb_iox/issues/3635 // model what happens when a field is treated like a tag // CASE WHEN system" IS NULL THEN '' ELSE system END = 5; - .add_expr( - when(col("system").is_null(), lit("")) - .otherwise(col("system")) - .unwrap() - .eq(lit(5i32)), - ); + .add_expr(make_empty_tag_ref_expr("system").eq(lit(5i32))); let predicate = InfluxRpcPredicate::new(None, predicate); let expected_error = "gRPC planner got error creating predicates: Error during planning: 'Utf8 = Int32' can't be evaluated because there isn't a common type to coerce the types to"; @@ -750,6 +745,29 @@ async fn test_read_filter_on_field_single_measurement() { run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await; } +#[tokio::test] +async fn test_read_filter_multi_negation() { + // reproducer for https://github.com/influxdata/influxdb_iox/issues/4800 + test_helpers::maybe_start_logging(); + + let host = make_empty_tag_ref_expr("host"); + let p1 = host.clone().eq(lit("server01")).or(host.eq(lit(""))); + let predicate = Predicate::default().add_expr(p1); + let predicate = InfluxRpcPredicate::new(None, predicate); + + let expected_results = vec![ + "Series tags={_measurement=attributes, _field=color}\n StringPoints timestamps: [8000], values: [\"blue\"]", + "Series tags={_measurement=cpu_load_short, host=server01, _field=value}\n FloatPoints timestamps: [1000], values: [27.99]", + "Series tags={_measurement=cpu_load_short, host=server01, region=us-east, _field=value}\n FloatPoints timestamps: [3000], values: [1234567.891011]", + "Series tags={_measurement=cpu_load_short, host=server01, region=us-west, _field=value}\n FloatPoints timestamps: [0, 4000], values: [0.64, 3e-6]", + "Series tags={_measurement=status, _field=active}\n BooleanPoints timestamps: [7000], values: [true]", + "Series tags={_measurement=swap, host=server01, name=disk0, _field=in}\n FloatPoints timestamps: [6000], values: [3.0]", + "Series tags={_measurement=swap, host=server01, name=disk0, _field=out}\n FloatPoints timestamps: [6000], values: [4.0]", + ]; + + run_read_filter_test_case(EndToEndTest {}, predicate, expected_results).await; +} + #[tokio::test] async fn test_read_filter_on_field_multi_measurement() { test_helpers::maybe_start_logging(); @@ -779,3 +797,13 @@ async fn test_read_filter_on_field_multi_measurement() { run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await; } + +/// https://github.com/influxdata/influxdb_iox/issues/3635 +/// model what happens when a field is treated like a tag compared to '' +/// +/// CASE WHEN system" IS NULL THEN '' ELSE system END +fn make_empty_tag_ref_expr(tag_name: &str) -> Expr { + when(col(tag_name).is_null(), lit("")) + .otherwise(col(tag_name)) + .unwrap() +}