fix: Do not run DataFusion optimizer pass twice (#4809)

* fix: Do not run DataFusion optimizer pass twice

* docs: improve docstring and logging
pull/24376/head
Andrew Lamb 2022-06-08 17:01:22 -04:00 committed by GitHub
parent 46de8d6cb3
commit f34282be2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 56 additions and 34 deletions

View File

@ -744,7 +744,7 @@ impl Compactor {
let ctx = self.exec.new_context(ExecutorType::Reorg);
let physical_plan = ctx
.prepare_plan(&plan)
.create_physical_plan(&plan)
.await
.context(CompactPhysicalPlanSnafu)?;

View File

@ -128,7 +128,7 @@ pub async fn compact(
// Build physical plan
let physical_plan = ctx
.prepare_plan(&logical_plan)
.create_physical_plan(&logical_plan)
.await
.context(PhysicalPlanSnafu {})?;

View File

@ -321,7 +321,7 @@ pub async fn query(
// Build physical plan
let physical_plan = ctx
.prepare_plan(&logical_plan)
.create_physical_plan(&logical_plan)
.await
.context(PhysicalPlanSnafu {})?;

View File

@ -23,7 +23,7 @@ use datafusion::{
prelude::*,
};
use futures::TryStreamExt;
use observability_deps::tracing::{debug, trace};
use observability_deps::tracing::debug;
use trace::{
ctx::SpanContext,
span::{MetaValue, SpanRecorder},
@ -279,23 +279,17 @@ impl IOxSessionContext {
debug!(text=%sql, "planning SQL query");
let logical_plan = ctx.inner.create_logical_plan(sql)?;
debug!(plan=%logical_plan.display_graphviz(), "logical plan");
ctx.prepare_plan(&logical_plan).await
ctx.create_physical_plan(&logical_plan).await
}
/// Prepare (optimize + plan) a pre-created logical plan for execution
pub async fn prepare_plan(&self, plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
let mut ctx = self.child_ctx("prepare_plan");
debug!(text=%plan.display_indent_schema(), "prepare_plan: initial plan");
/// Prepare (optimize + plan) a pre-created [`LogicalPlan`] for execution
pub async fn create_physical_plan(&self, plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
let mut ctx = self.child_ctx("create_physical_plan");
debug!(text=%plan.display_indent_schema(), "create_physical_plan: initial plan");
let physical_plan = ctx.inner.create_physical_plan(plan).await?;
let plan = ctx.inner.optimize(plan)?;
ctx.recorder.event("optimized plan");
trace!(text=%plan.display_indent_schema(), graphviz=%plan.display_graphviz(), "optimized plan");
let physical_plan = ctx.inner.create_physical_plan(&plan).await?;
ctx.recorder.event("plan to run");
debug!(text=%displayable(physical_plan.as_ref()).indent(), "prepare_plan: plan to run");
ctx.recorder.event("physical plan");
debug!(text=%displayable(physical_plan.as_ref()).indent(), "create_physical_plan: plan to run");
Ok(physical_plan)
}
@ -400,7 +394,7 @@ impl IOxSessionContext {
let tag_columns = Arc::new(tag_columns);
let physical_plan = ctx.prepare_plan(&plan).await?;
let physical_plan = ctx.create_physical_plan(&plan).await?;
let it = ctx.execute_stream(physical_plan).await?;
@ -466,7 +460,7 @@ impl IOxSessionContext {
.map(|plan| {
let ctx = self.child_ctx("to_field_list");
self.run(async move {
let physical_plan = ctx.prepare_plan(&plan).await?;
let physical_plan = ctx.create_physical_plan(&plan).await?;
// TODO: avoid this buffering
let field_list =
@ -523,7 +517,7 @@ impl IOxSessionContext {
.map(|plan| {
let ctx = self.child_ctx("run_logical_plans");
self.run(async move {
let physical_plan = ctx.prepare_plan(&plan).await?;
let physical_plan = ctx.create_physical_plan(&plan).await?;
// TODO: avoid this buffering
ctx.collect(physical_plan).await

View File

@ -69,7 +69,7 @@ mod test {
let executor = Executor::new(1);
let plan = executor
.new_context(ExecutorType::Reorg)
.prepare_plan(&split_plan)
.create_physical_plan(&split_plan)
.await
.unwrap();

View File

@ -342,7 +342,7 @@ mod test {
let executor = Executor::new(1);
let physical_plan = executor
.new_context(ExecutorType::Reorg)
.prepare_plan(&scan_plan)
.create_physical_plan(&scan_plan)
.await
.unwrap();
@ -387,7 +387,7 @@ mod test {
let executor = Executor::new(1);
let physical_plan = executor
.new_context(ExecutorType::Reorg)
.prepare_plan(&compact_plan)
.create_physical_plan(&compact_plan)
.await
.unwrap();
assert_eq!(
@ -440,7 +440,7 @@ mod test {
let executor = Executor::new(1);
let physical_plan = executor
.new_context(ExecutorType::Reorg)
.prepare_plan(&split_plan)
.create_physical_plan(&split_plan)
.await
.unwrap();

View File

@ -3,8 +3,8 @@ use std::sync::Arc;
#[cfg(test)]
use crate::scenarios::{
DbScenario, DbSetup, TwoMeasurements, TwoMeasurementsManyFields, TwoMeasurementsWithDelete,
TwoMeasurementsWithDeleteAll,
DbScenario, DbSetup, EndToEndTest, TwoMeasurements, TwoMeasurementsManyFields,
TwoMeasurementsWithDelete, TwoMeasurementsWithDeleteAll,
};
use crate::{
db::AbstractDb,
@ -15,7 +15,7 @@ use crate::{
TwoMeasurementsMultiSeriesWithDelete, TwoMeasurementsMultiSeriesWithDeleteAll,
},
};
use datafusion::logical_plan::{col, lit, when};
use datafusion::logical_plan::{col, lit, when, Expr};
use iox_query::frontend::influxrpc::InfluxRpcPlanner;
use predicate::rpc_predicate::InfluxRpcPredicate;
use predicate::Predicate;
@ -187,12 +187,7 @@ async fn test_read_filter_invalid_predicate_case() {
// https://github.com/influxdata/influxdb_iox/issues/3635
// model what happens when a field is treated like a tag
// CASE WHEN system" IS NULL THEN '' ELSE system END = 5;
.add_expr(
when(col("system").is_null(), lit(""))
.otherwise(col("system"))
.unwrap()
.eq(lit(5i32)),
);
.add_expr(make_empty_tag_ref_expr("system").eq(lit(5i32)));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_error = "gRPC planner got error creating predicates: Error during planning: 'Utf8 = Int32' can't be evaluated because there isn't a common type to coerce the types to";
@ -750,6 +745,29 @@ async fn test_read_filter_on_field_single_measurement() {
run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_multi_negation() {
// reproducer for https://github.com/influxdata/influxdb_iox/issues/4800
test_helpers::maybe_start_logging();
let host = make_empty_tag_ref_expr("host");
let p1 = host.clone().eq(lit("server01")).or(host.eq(lit("")));
let predicate = Predicate::default().add_expr(p1);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_measurement=attributes, _field=color}\n StringPoints timestamps: [8000], values: [\"blue\"]",
"Series tags={_measurement=cpu_load_short, host=server01, _field=value}\n FloatPoints timestamps: [1000], values: [27.99]",
"Series tags={_measurement=cpu_load_short, host=server01, region=us-east, _field=value}\n FloatPoints timestamps: [3000], values: [1234567.891011]",
"Series tags={_measurement=cpu_load_short, host=server01, region=us-west, _field=value}\n FloatPoints timestamps: [0, 4000], values: [0.64, 3e-6]",
"Series tags={_measurement=status, _field=active}\n BooleanPoints timestamps: [7000], values: [true]",
"Series tags={_measurement=swap, host=server01, name=disk0, _field=in}\n FloatPoints timestamps: [6000], values: [3.0]",
"Series tags={_measurement=swap, host=server01, name=disk0, _field=out}\n FloatPoints timestamps: [6000], values: [4.0]",
];
run_read_filter_test_case(EndToEndTest {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_on_field_multi_measurement() {
test_helpers::maybe_start_logging();
@ -779,3 +797,13 @@ async fn test_read_filter_on_field_multi_measurement() {
run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await;
}
/// https://github.com/influxdata/influxdb_iox/issues/3635
/// model what happens when a field is treated like a tag compared to ''
///
/// CASE WHEN system" IS NULL THEN '' ELSE system END
fn make_empty_tag_ref_expr(tag_name: &str) -> Expr {
when(col(tag_name).is_null(), lit(""))
.otherwise(col(tag_name))
.unwrap()
}