fix: use correct sort key in projection_pushdown (#7718)
* fix: use correct sort key in projection_pushdown * fix: tabs in docs * refactor: Use Serde to format test resultspull/24376/head
@ -114,6 +114,6 @@
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@0 >= 957528000000000000 AND time@0 <= 957531540000000000 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time@2 >= 957528000000000000 AND time@2 <= 957531540000000000, pruning_predicate=time_max@0 >= 957528000000000000 AND time_min@1 <= 957531540000000000, output_ordering=[time@0 ASC], projection=[time, user] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time@2 >= 957528000000000000 AND time@2 <= 957531540000000000, pruning_predicate=time_max@0 >= 957528000000000000 AND time_min@1 <= 957531540000000000, output_ordering=[], projection=[time, user] |
| | |
@ -90,7 +90,7 @@
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=6 |
| | ProjectionExec: expr=[city@0 as name] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[city@0 ASC], projection=[city] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[], projection=[city] |
| | ProjectionExec: expr=[city@1 as city] |
| | DeduplicateExec: [state@2 ASC,city@1 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [state@2 ASC,city@1 ASC,time@3 ASC,__chunk_order@0 ASC] |
@ -77,7 +77,7 @@
| | ParquetExec: limit=None, partitions={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000002.parquet], [1/1/1/00000000-0000-0000-0000-000000000003.parquet]]}, output_ordering=[state@2 ASC, city@1 ASC, time@3 ASC, __chunk_order@0 ASC], projection=[__chunk_order, city, state, time] |
| | ProjectionExec: expr=[city@0 as name] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet], [1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[city@0 ASC], projection=[city] |
| | ParquetExec: limit=None, partitions={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet], [1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[], projection=[city] |
| | ProjectionExec: expr=[city@1 as city] |
| | DeduplicateExec: [state@2 ASC,city@1 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [state@2 ASC,city@1 ASC,time@3 ASC,__chunk_order@0 ASC] |
@ -37,7 +37,7 @@
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@0 >= 957528000000000000 AND time@0 <= 957531540000000000 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time@2 >= 957528000000000000 AND time@2 <= 957531540000000000, pruning_predicate=time_max@0 >= 957528000000000000 AND time_min@1 <= 957531540000000000, output_ordering=[time@0 ASC], projection=[time, user] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time@2 >= 957528000000000000 AND time@2 <= 957531540000000000, pruning_predicate=time_max@0 >= 957528000000000000 AND time_min@1 <= 957531540000000000, output_ordering=[], projection=[time, user] |
| | |
-- SQL: SELECT date_bin_gapfill(interval '10 minute', time) as minute, count(cpu.user) from cpu where time between timestamp '2000-05-05T12:00:00Z' and timestamp '2000-05-05T12:59:00Z' group by minute;
@ -901,7 +901,7 @@ name: physical_plan
RepartitionExec: partitioning=Hash([Column { name: "tag0", index: 0 }], 4), input_partitions=4
RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
AggregateExec: mode=Partial, gby=[tag0@1 as tag0], aggr=[COUNT(m0.f64), SUM(m0.f64), STDDEV(m0.f64)]
ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[tag0@1 ASC], projection=[f64, tag0]
ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[], projection=[f64, tag0]
ProjectionExec: expr=[m1 as iox::measurement, 0 as time, tag0@0 as tag0, COUNT(m1.f64)@1 as count, SUM(m1.f64)@2 as sum, STDDEV(m1.f64)@3 as stddev]
AggregateExec: mode=FinalPartitioned, gby=[tag0@0 as tag0], aggr=[COUNT(m1.f64), SUM(m1.f64), STDDEV(m1.f64)]
CoalesceBatchesExec: target_batch_size=8192
@ -51,7 +51,7 @@
| logical_plan | Projection: h2o.temp, h2o.other_temp, h2o.time |
| | TableScan: h2o projection=[other_temp, temp, time] |
| physical_plan | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[time@2 ASC], projection=[temp, other_temp, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[], projection=[temp, other_temp, time] |
| | ProjectionExec: expr=[temp@3 as temp, other_temp@4 as other_temp, time@5 as time] |
| | DeduplicateExec: [city@1 ASC,state@2 ASC,time@5 ASC] |
| | SortPreservingMergeExec: [city@1 ASC,state@2 ASC,time@5 ASC,__chunk_order@0 ASC] |
@ -94,32 +94,14 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
None => column_indices,
let output_ordering = match &child_parquet.base_config().output_ordering {
Some(sort_exprs) => {
let projected_schema = projection_exec.schema();
// filter out sort exprs columns that got projected away
let known_columns = projected_schema
let sort_exprs = sort_exprs
.filter(|expr| {
if let Some(col) = expr.expr.as_any().downcast_ref::<Column>() {
} else {
Some(reassign_sort_exprs_columns(&sort_exprs, &projected_schema)?)
None => None,
let output_ordering = child_parquet
.map(|output_ordering| {
project_output_ordering(output_ordering, projection_exec.schema())
let base_config = FileScanConfig {
projection: Some(projection),
@ -272,6 +254,56 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
/// Given the output ordering and a projected schema, returns the
/// largest prefix of the ordering that is in the projection
/// For example,
/// ```text
/// output_ordering: a, b, c
/// projection: a, c
/// returns --> a
/// ```
/// To see why the input has to be a prefix, consider this input:
/// ```text
/// a b
/// 1 1
/// 2 2
/// 3 1
/// ``
/// It is sorted on `a,b` but *not* sorted on `b`
fn project_output_ordering(
output_ordering: &[PhysicalSortExpr],
projected_schema: SchemaRef,
) -> Result<Vec<PhysicalSortExpr>> {
// filter out sort exprs columns that got projected away
let known_columns = projected_schema
// take longest prefix
let sort_exprs = output_ordering
.take_while(|expr| {
if let Some(col) = expr.expr.as_any().downcast_ref::<Column>() {
} else {
// do not keep exprs like `a+1` or `-a` as they may
// not maintain ordering
reassign_sort_exprs_columns(&sort_exprs, &projected_schema)
fn schema_name_projection(
schema: &SchemaRef,
cols: &[&str],
@ -395,7 +427,7 @@ fn reassign_sort_exprs_columns(
mod tests {
use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
datatypes::{DataType, Field, Fields, Schema, SchemaRef},
use datafusion::{
@ -406,6 +438,7 @@ mod tests {
use serde::Serialize;
use crate::{
physical_optimizer::test_util::{assert_unknown_partitioning, OptimizationTest},
@ -734,7 +767,7 @@ mod tests {
- " ParquetExec: limit=None, partitions={0 groups: []}, predicate=tag1@0 = foo, pruning_predicate=tag1_min@0 <= foo AND foo <= tag1_max@1, output_ordering=[tag3@1 ASC, field@0 ASC, tag2@2 ASC], projection=[field, tag3, tag2]"
- " ParquetExec: limit=None, partitions={0 groups: []}, predicate=tag1@0 = foo, pruning_predicate=tag1_min@0 <= foo AND foo <= tag1_max@1, output_ordering=[tag3@1 ASC, tag2@0 ASC], projection=[tag2, tag3]"
- " ParquetExec: limit=None, partitions={0 groups: []}, predicate=tag1@0 = foo, pruning_predicate=tag1_min@0 <= foo AND foo <= tag1_max@1, output_ordering=[tag3@1 ASC], projection=[tag2, tag3]"
@ -1349,6 +1382,232 @@ mod tests {
fn test_project_output_ordering_keep() {
let schema = schema();
let projection = vec!["tag1", "tag2"];
let output_ordering = vec![
PhysicalSortExpr {
expr: expr_col("tag1", &schema),
options: Default::default(),
PhysicalSortExpr {
expr: expr_col("tag2", &schema),
options: Default::default(),
ProjectOutputOrdering::new(&schema, output_ordering, projection),
- tag1@0
- tag2@1
- tag1
- tag2
- tag1@0
- tag2@1
fn test_project_output_ordering_project_prefix() {
let schema = schema();
let projection = vec!["tag1"]; // prefix of the sort key
let output_ordering = vec![
PhysicalSortExpr {
expr: expr_col("tag1", &schema),
options: Default::default(),
PhysicalSortExpr {
expr: expr_col("tag2", &schema),
options: Default::default(),
ProjectOutputOrdering::new(&schema, output_ordering, projection),
- tag1@0
- tag2@1
- tag1
- tag1@0
fn test_project_output_ordering_project_non_prefix() {
let schema = schema();
let projection = vec!["tag2"]; // in sort key, but not prefix
let output_ordering = vec![
PhysicalSortExpr {
expr: expr_col("tag1", &schema),
options: Default::default(),
PhysicalSortExpr {
expr: expr_col("tag2", &schema),
options: Default::default(),
ProjectOutputOrdering::new(&schema, output_ordering, projection),
- tag1@0
- tag2@1
- tag2
projected_ordering: []
fn test_project_output_ordering_projection_reorder() {
let schema = schema();
let projection = vec!["tag2", "tag1", "field"]; // in different order than sort key
let output_ordering = vec![
PhysicalSortExpr {
expr: expr_col("tag1", &schema),
options: Default::default(),
PhysicalSortExpr {
expr: expr_col("tag2", &schema),
options: Default::default(),
ProjectOutputOrdering::new(&schema, output_ordering, projection),
- tag1@0
- tag2@1
- tag2
- tag1
- field
- tag1@1
- tag2@0
fn test_project_output_ordering_constant() {
let schema = schema();
let projection = vec!["tag2"];
let output_ordering = vec![
// ordering by a constant is ignored
PhysicalSortExpr {
expr: datafusion::physical_plan::expressions::lit(1),
options: Default::default(),
PhysicalSortExpr {
expr: expr_col("tag2", &schema),
options: Default::default(),
ProjectOutputOrdering::new(&schema, output_ordering, projection),
- "1"
- tag2@1
- tag2
projected_ordering: []
fn test_project_output_ordering_constant_second_position() {
let schema = schema();
let projection = vec!["tag2"];
let output_ordering = vec![
PhysicalSortExpr {
expr: expr_col("tag2", &schema),
options: Default::default(),
// ordering by a constant is ignored
PhysicalSortExpr {
expr: datafusion::physical_plan::expressions::lit(1),
options: Default::default(),
ProjectOutputOrdering::new(&schema, output_ordering, projection),
- tag2@1
- "1"
- tag2
- tag2@0
/// project the output_ordering with the projection,
// derive serde to make a nice 'insta' snapshot
#[derive(Debug, Serialize)]
struct ProjectOutputOrdering {
output_ordering: Vec<String>,
projection: Vec<String>,
projected_ordering: Vec<String>,
impl ProjectOutputOrdering {
fn new(
schema: &Schema,
output_ordering: Vec<PhysicalSortExpr>,
projection: Vec<&'static str>,
) -> Self {
let projected_fields: Fields = projection
.map(|field_name| {
.expect("finding field")
let projected_schema = Arc::new(Schema::new(projected_fields));
let projected_ordering = project_output_ordering(&output_ordering, projected_schema);
let projected_ordering = match projected_ordering {
Ok(projected_ordering) => format_sort_exprs(&projected_ordering),
Err(e) => vec![e.to_string()],
Self {
output_ordering: format_sort_exprs(&output_ordering),
projection: projection.iter().map(|s| s.to_string()).collect(),
fn schema() -> SchemaRef {
Field::new("tag1", DataType::Utf8, true),
@ -1357,6 +1616,16 @@ mod tests {
fn format_sort_exprs(sort_exprs: &[PhysicalSortExpr]) -> Vec<String> {
.map(|expr| {
let PhysicalSortExpr { expr, options: _ } = expr;
fn expr_col(name: &str, schema: &SchemaRef) -> Arc<dyn PhysicalExpr> {
Arc::new(Column::new_with_schema(name, schema).unwrap())
Reference in New Issue