feat: enable parquet predicate pushdown in IOx (#5930)

pull/24376/head
Andrew Lamb 2022-11-02 14:00:47 -04:00 committed by GitHub
parent 3ba0458653
commit 58838e214e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 179 additions and 4 deletions

View File

@ -1,4 +1,9 @@
use datafusion::{config::OPT_COALESCE_TARGET_BATCH_SIZE, prelude::SessionConfig};
use datafusion::{
config::{
OPT_COALESCE_TARGET_BATCH_SIZE, OPT_PARQUET_PUSHDOWN_FILTERS, OPT_PARQUET_REORDER_FILTERS,
},
prelude::SessionConfig,
};
// The default catalog name - this impacts what SQL queries use if not specified
pub const DEFAULT_CATALOG: &str = "public";
@ -18,6 +23,9 @@ pub fn iox_session_config() -> SessionConfig {
OPT_COALESCE_TARGET_BATCH_SIZE,
COALESCE_BATCH_SIZE.try_into().unwrap(),
)
// Enable parquet predicate pushdown optimization
.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, true)
.set_bool(OPT_PARQUET_REORDER_FILTERS, true)
.create_default_catalog_and_schema(true)
.with_information_schema(true)
.with_default_catalog_and_schema(DEFAULT_CATALOG, DEFAULT_SCHEMA)

View File

@ -37,12 +37,15 @@ use snafu::{ResultExt, Snafu};
mod adapter;
mod deduplicate;
mod metrics;
pub mod overlap;
mod physical;
use self::overlap::group_potential_duplicates;
pub use deduplicate::{DeduplicateExec, RecordBatchDeduplicator};
pub(crate) use physical::IOxReadFilterNode;
pub use metrics::parquet_metrics;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(

View File

@ -0,0 +1,36 @@
use std::sync::Arc;
use datafusion::physical_plan::{file_format::ParquetExec, metrics::MetricsSet, ExecutionPlan};
use super::IOxReadFilterNode;
/// Recursively retrieve metrics from all ParquetExec's in `plan`
pub fn parquet_metrics(plan: Arc<dyn ExecutionPlan>) -> Vec<MetricsSet> {
let mut output = vec![];
parquet_metrics_impl(plan, &mut output);
output
}
fn parquet_metrics_impl(plan: Arc<dyn ExecutionPlan>, output: &mut Vec<MetricsSet>) {
// Temporarily need to special case `IoxReadFilter` as it
// may create `ParquetExec` during execution.
//
// This can be removed when
// <https://github.com/influxdata/influxdb_iox/issues/5897> is
// completed
if let Some(iox_read_node) = plan.as_any().downcast_ref::<IOxReadFilterNode>() {
for child in iox_read_node.parquet_execs() {
parquet_metrics_impl(child, output)
}
}
if let Some(parquet) = plan.as_any().downcast_ref::<ParquetExec>() {
if let Some(metrics) = parquet.metrics() {
output.push(metrics)
}
}
for child in plan.children() {
parquet_metrics_impl(child, output)
}
}

View File

@ -20,6 +20,7 @@ use datafusion::{
};
use futures::TryStreamExt;
use observability_deps::tracing::trace;
use parking_lot::Mutex;
use predicate::Predicate;
use schema::Schema;
use std::{collections::HashSet, fmt, sync::Arc};
@ -33,9 +34,18 @@ pub(crate) struct IOxReadFilterNode {
iox_schema: Arc<Schema>,
chunks: Vec<Arc<dyn QueryChunk>>,
predicate: Predicate,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// remember all ParquetExecs created by this node so we can pass
/// along metrics.
///
/// When we use ParquetExec directly (rather
/// than an IOxReadFilterNode) the metric will be directly
/// available: <https://github.com/influxdata/influxdb_iox/issues/5897>
parquet_execs: Mutex<Vec<Arc<ParquetExec>>>,
// execution context used for tracing
ctx: IOxSessionContext,
}
@ -52,14 +62,21 @@ impl IOxReadFilterNode {
predicate: Predicate,
) -> Self {
Self {
ctx,
table_name,
iox_schema,
chunks,
predicate,
metrics: ExecutionPlanMetricsSet::new(),
parquet_execs: Mutex::new(vec![]),
ctx,
}
}
// Meant for testing -- provide input to the inner parquet execs
// that were created
pub fn parquet_execs(&self) -> Vec<Arc<ParquetExec>> {
self.parquet_execs.lock().to_vec()
}
}
impl ExecutionPlan for IOxReadFilterNode {
@ -101,6 +118,7 @@ impl ExecutionPlan for IOxReadFilterNode {
iox_schema: Arc::clone(&self.iox_schema),
chunks,
predicate: self.predicate.clone(),
parquet_execs: Mutex::new(self.parquet_execs()),
metrics: ExecutionPlanMetricsSet::new(),
};
@ -187,10 +205,19 @@ impl ExecutionPlan for IOxReadFilterNode {
.predicate
.clone()
.with_delete_predicates(&delete_predicates);
let exec = ParquetExec::new(base_config, predicate.filter_expr(), None);
let metadata_size_hint = None;
let exec = Arc::new(ParquetExec::new(
base_config,
predicate.filter_expr(),
metadata_size_hint,
));
self.parquet_execs.lock().push(Arc::clone(&exec));
let stream = RecordBatchStreamAdapter::new(
schema,
futures::stream::once(execute_stream(Arc::new(exec), context)).try_flatten(),
futures::stream::once(execute_stream(exec, context)).try_flatten(),
);
// Note: No SchemaAdapterStream required here because `ParquetExec` already creates NULL columns for us.

View File

@ -15,6 +15,8 @@ mod runner;
#[cfg(test)]
pub mod sql;
#[cfg(test)]
pub mod sql_metrics;
#[cfg(test)]
pub mod table_schema;
pub mod db;

View File

@ -0,0 +1,99 @@
use std::sync::Arc;
use crate::scenarios::{DbScenario, DbSetup, OneMeasurementFourChunksWithDuplicatesParquetOnly};
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_sorted_eq;
use datafusion::physical_plan::{
display::DisplayableExecutionPlan,
metrics::{MetricValue, MetricsSet},
};
use iox_query::{frontend::sql::SqlQueryPlanner, provider::parquet_metrics};
#[tokio::test]
async fn sql_predicate_pushdown() {
test_helpers::maybe_start_logging();
// parquet pushdown is only relevant for parquet
let db_setup = OneMeasurementFourChunksWithDuplicatesParquetOnly {};
// This predicate should result in rows being pruned, and we verify this with metrics
let sql = "SELECT * from h2o where state = 'MA'".to_string();
let expected = vec![
"+------+---------+----------+----------+-------+--------------------------------+",
"| area | city | max_temp | min_temp | state | time |",
"+------+---------+----------+----------+-------+--------------------------------+",
"| | Andover | 69.2 | | MA | 1970-01-01T00:00:00.000000250Z |",
"| | Boston | | 67.4 | MA | 1970-01-01T00:00:00.000000600Z |",
"| | Boston | | 70.4 | MA | 1970-01-01T00:00:00.000000050Z |",
"| | Boston | 75.4 | 65.4 | MA | 1970-01-01T00:00:00.000000250Z |",
"| | Boston | 82.67 | 65.4 | MA | 1970-01-01T00:00:00.000000400Z |",
"| | Reading | | 53.4 | MA | 1970-01-01T00:00:00.000000250Z |",
"| | Reading | | 60.4 | MA | 1970-01-01T00:00:00.000000600Z |",
"| 742 | Bedford | 78.75 | 71.59 | MA | 1970-01-01T00:00:00.000000150Z |",
"| 742 | Bedford | 88.75 | | MA | 1970-01-01T00:00:00.000000600Z |",
"| 750 | Bedford | 80.75 | 65.22 | MA | 1970-01-01T00:00:00.000000400Z |",
"+------+---------+----------+----------+-------+--------------------------------+",
];
for scenario in db_setup.make().await {
let DbScenario {
scenario_name, db, ..
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("SQL: '{:#?}'", sql);
let planner = SqlQueryPlanner::default();
let ctx = db.new_query_context(None);
let physical_plan = planner
.query(&sql, &ctx)
.await
.expect("built plan successfully");
let results: Vec<RecordBatch> = ctx
.collect(Arc::clone(&physical_plan))
.await
.expect("Running plan");
assert_batches_sorted_eq!(expected, &results);
println!(
"Physical plan:\n\n{}",
DisplayableExecutionPlan::new(physical_plan.as_ref()).indent()
);
// verify that pushdown was enabled and that it filtered rows
let metrics = parquet_metrics(physical_plan);
assert_eq!(
metric_value_sum(&metrics, "pushdown_rows_filtered"),
8,
"Unexpected number of rows filtered in:\n\n{:#?}",
metrics
);
}
}
/// returns the sum of all the metrics with the specified name
/// the returned set.
///
/// Count: returns value
///
/// Panics if no such metric.
fn metric_value_sum(metrics: &[MetricsSet], metric_name: &str) -> usize {
metrics.iter().map(|m| metric_value(m, metric_name)).sum()
}
fn metric_value(metrics: &MetricsSet, metric_name: &str) -> usize {
let sum = metrics
.sum(|m| matches!(m.value(), MetricValue::Count { name, .. } if name == metric_name));
match sum {
Some(MetricValue::Count { count, .. }) => count.value(),
_ => {
panic!(
"Expected metric not found. Looking for '{}' in\n\n{:#?}",
metric_name, metrics
);
}
}
}