diff --git a/influxdb_iox/tests/query_tests2/cases/in/date_bin.sql.expected b/influxdb_iox/tests/query_tests2/cases/in/date_bin.sql.expected index 000b433ed3..f2f3a6689f 100644 --- a/influxdb_iox/tests/query_tests2/cases/in/date_bin.sql.expected +++ b/influxdb_iox/tests/query_tests2/cases/in/date_bin.sql.expected @@ -114,6 +114,6 @@ | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: time@0 >= 957528000000000000 AND time@0 <= 957531540000000000 | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time@2 >= 957528000000000000 AND time@2 <= 957531540000000000, pruning_predicate=time_max@0 >= 957528000000000000 AND time_min@1 <= 957531540000000000, output_ordering=[time@0 ASC], projection=[time, user] | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time@2 >= 957528000000000000 AND time@2 <= 957531540000000000, pruning_predicate=time_max@0 >= 957528000000000000 AND time_min@1 <= 957531540000000000, output_ordering=[], projection=[time, user] | | | | ---------- \ No newline at end of file diff --git a/influxdb_iox/tests/query_tests2/cases/in/duplicates_ingester.sql.expected b/influxdb_iox/tests/query_tests2/cases/in/duplicates_ingester.sql.expected index 8ae8337e3f..1ca1d93cb4 100644 --- a/influxdb_iox/tests/query_tests2/cases/in/duplicates_ingester.sql.expected +++ b/influxdb_iox/tests/query_tests2/cases/in/duplicates_ingester.sql.expected @@ -90,7 +90,7 @@ | | RecordBatchesExec: batches_groups=1 batches=1 total_rows=6 | | | ProjectionExec: expr=[city@0 as name] | | | UnionExec | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[city@0 ASC], projection=[city] | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[], projection=[city] | | | ProjectionExec: expr=[city@1 as city] | | | DeduplicateExec: [state@2 ASC,city@1 ASC,time@3 ASC] | | | SortPreservingMergeExec: [state@2 ASC,city@1 ASC,time@3 ASC,__chunk_order@0 ASC] | diff --git a/influxdb_iox/tests/query_tests2/cases/in/duplicates_parquet.sql.expected b/influxdb_iox/tests/query_tests2/cases/in/duplicates_parquet.sql.expected index 2a71d917d3..e21f14d74d 100644 --- a/influxdb_iox/tests/query_tests2/cases/in/duplicates_parquet.sql.expected +++ b/influxdb_iox/tests/query_tests2/cases/in/duplicates_parquet.sql.expected @@ -77,7 +77,7 @@ | | ParquetExec: limit=None, partitions={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000002.parquet], [1/1/1/00000000-0000-0000-0000-000000000003.parquet]]}, output_ordering=[state@2 ASC, city@1 ASC, time@3 ASC, __chunk_order@0 ASC], projection=[__chunk_order, city, state, time] | | | ProjectionExec: expr=[city@0 as name] | | | UnionExec | -| | ParquetExec: limit=None, partitions={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet], [1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[city@0 ASC], projection=[city] | +| | ParquetExec: limit=None, partitions={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet], [1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[], projection=[city] | | | ProjectionExec: expr=[city@1 as city] | | | DeduplicateExec: [state@2 ASC,city@1 ASC,time@3 ASC] | | | SortPreservingMergeExec: [state@2 ASC,city@1 ASC,time@3 ASC,__chunk_order@0 ASC] | diff --git a/influxdb_iox/tests/query_tests2/cases/in/gapfill.sql.expected b/influxdb_iox/tests/query_tests2/cases/in/gapfill.sql.expected index 9db522e1f4..363ae18ffe 100644 --- a/influxdb_iox/tests/query_tests2/cases/in/gapfill.sql.expected +++ b/influxdb_iox/tests/query_tests2/cases/in/gapfill.sql.expected @@ -37,7 +37,7 @@ | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: time@0 >= 957528000000000000 AND time@0 <= 957531540000000000 | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time@2 >= 957528000000000000 AND time@2 <= 957531540000000000, pruning_predicate=time_max@0 >= 957528000000000000 AND time_min@1 <= 957531540000000000, output_ordering=[time@0 ASC], projection=[time, user] | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time@2 >= 957528000000000000 AND time@2 <= 957531540000000000, pruning_predicate=time_max@0 >= 957528000000000000 AND time_min@1 <= 957531540000000000, output_ordering=[], projection=[time, user] | | | | ---------- -- SQL: SELECT date_bin_gapfill(interval '10 minute', time) as minute, count(cpu.user) from cpu where time between timestamp '2000-05-05T12:00:00Z' and timestamp '2000-05-05T12:59:00Z' group by minute; diff --git a/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected b/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected index 9581b93e23..3c9067e03d 100644 --- a/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected +++ b/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected @@ -901,7 +901,7 @@ name: physical_plan RepartitionExec: partitioning=Hash([Column { name: "tag0", index: 0 }], 4), input_partitions=4 RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 AggregateExec: mode=Partial, gby=[tag0@1 as tag0], aggr=[COUNT(m0.f64), SUM(m0.f64), STDDEV(m0.f64)] - ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[tag0@1 ASC], projection=[f64, tag0] + ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[], projection=[f64, tag0] ProjectionExec: expr=[m1 as iox::measurement, 0 as time, tag0@0 as tag0, COUNT(m1.f64)@1 as count, SUM(m1.f64)@2 as sum, STDDEV(m1.f64)@3 as stddev] AggregateExec: mode=FinalPartitioned, gby=[tag0@0 as tag0], aggr=[COUNT(m1.f64), SUM(m1.f64), STDDEV(m1.f64)] CoalesceBatchesExec: target_batch_size=8192 diff --git a/influxdb_iox/tests/query_tests2/cases/in/several_chunks.sql.expected b/influxdb_iox/tests/query_tests2/cases/in/several_chunks.sql.expected index d67235eb83..81efbfe35f 100644 --- a/influxdb_iox/tests/query_tests2/cases/in/several_chunks.sql.expected +++ b/influxdb_iox/tests/query_tests2/cases/in/several_chunks.sql.expected @@ -51,7 +51,7 @@ | logical_plan | Projection: h2o.temp, h2o.other_temp, h2o.time | | | TableScan: h2o projection=[other_temp, temp, time] | | physical_plan | UnionExec | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[time@2 ASC], projection=[temp, other_temp, time] | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[], projection=[temp, other_temp, time] | | | ProjectionExec: expr=[temp@3 as temp, other_temp@4 as other_temp, time@5 as time] | | | DeduplicateExec: [city@1 ASC,state@2 ASC,time@5 ASC] | | | SortPreservingMergeExec: [city@1 ASC,state@2 ASC,time@5 ASC,__chunk_order@0 ASC] | diff --git a/iox_query/src/physical_optimizer/projection_pushdown.rs b/iox_query/src/physical_optimizer/projection_pushdown.rs index 4fcf6ab550..002c83b3a8 100644 --- a/iox_query/src/physical_optimizer/projection_pushdown.rs +++ b/iox_query/src/physical_optimizer/projection_pushdown.rs @@ -94,32 +94,14 @@ impl PhysicalOptimizerRule for ProjectionPushdown { .collect::>>()?, None => column_indices, }; - let output_ordering = match &child_parquet.base_config().output_ordering { - Some(sort_exprs) => { - let projected_schema = projection_exec.schema(); - - // filter out sort exprs columns that got projected away - let known_columns = projected_schema - .all_fields() - .iter() - .map(|f| f.name().as_str()) - .collect::>(); - let sort_exprs = sort_exprs - .iter() - .filter(|expr| { - if let Some(col) = expr.expr.as_any().downcast_ref::() { - known_columns.contains(col.name()) - } else { - true - } - }) - .cloned() - .collect::>(); - - Some(reassign_sort_exprs_columns(&sort_exprs, &projected_schema)?) - } - None => None, - }; + let output_ordering = child_parquet + .base_config() + .output_ordering + .as_ref() + .map(|output_ordering| { + project_output_ordering(output_ordering, projection_exec.schema()) + }) + .transpose()?; let base_config = FileScanConfig { projection: Some(projection), output_ordering, @@ -272,6 +254,56 @@ impl PhysicalOptimizerRule for ProjectionPushdown { } } +/// Given the output ordering and a projected schema, returns the +/// largest prefix of the ordering that is in the projection +/// +/// For example, +/// +/// ```text +/// output_ordering: a, b, c +/// projection: a, c +/// returns --> a +/// ``` +/// +/// To see why the input has to be a prefix, consider this input: +/// +/// ```text +/// a b +/// 1 1 +/// 2 2 +/// 3 1 +/// `` +/// +/// It is sorted on `a,b` but *not* sorted on `b` +fn project_output_ordering( + output_ordering: &[PhysicalSortExpr], + projected_schema: SchemaRef, +) -> Result> { + // filter out sort exprs columns that got projected away + let known_columns = projected_schema + .all_fields() + .iter() + .map(|f| f.name().as_str()) + .collect::>(); + + // take longest prefix + let sort_exprs = output_ordering + .iter() + .take_while(|expr| { + if let Some(col) = expr.expr.as_any().downcast_ref::() { + known_columns.contains(col.name()) + } else { + // do not keep exprs like `a+1` or `-a` as they may + // not maintain ordering + false + } + }) + .cloned() + .collect::>(); + + reassign_sort_exprs_columns(&sort_exprs, &projected_schema) +} + fn schema_name_projection( schema: &SchemaRef, cols: &[&str], @@ -395,7 +427,7 @@ fn reassign_sort_exprs_columns( mod tests { use arrow::{ compute::SortOptions, - datatypes::{DataType, Field, Schema, SchemaRef}, + datatypes::{DataType, Field, Fields, Schema, SchemaRef}, }; use datafusion::{ datasource::object_store::ObjectStoreUrl, @@ -406,6 +438,7 @@ mod tests { }, scalar::ScalarValue, }; + use serde::Serialize; use crate::{ physical_optimizer::test_util::{assert_unknown_partitioning, OptimizationTest}, @@ -734,7 +767,7 @@ mod tests { - " ParquetExec: limit=None, partitions={0 groups: []}, predicate=tag1@0 = foo, pruning_predicate=tag1_min@0 <= foo AND foo <= tag1_max@1, output_ordering=[tag3@1 ASC, field@0 ASC, tag2@2 ASC], projection=[field, tag3, tag2]" output: Ok: - - " ParquetExec: limit=None, partitions={0 groups: []}, predicate=tag1@0 = foo, pruning_predicate=tag1_min@0 <= foo AND foo <= tag1_max@1, output_ordering=[tag3@1 ASC, tag2@0 ASC], projection=[tag2, tag3]" + - " ParquetExec: limit=None, partitions={0 groups: []}, predicate=tag1@0 = foo, pruning_predicate=tag1_min@0 <= foo AND foo <= tag1_max@1, output_ordering=[tag3@1 ASC], projection=[tag2, tag3]" "### ); @@ -1349,6 +1382,232 @@ mod tests { ); } + #[test] + fn test_project_output_ordering_keep() { + let schema = schema(); + let projection = vec!["tag1", "tag2"]; + let output_ordering = vec![ + PhysicalSortExpr { + expr: expr_col("tag1", &schema), + options: Default::default(), + }, + PhysicalSortExpr { + expr: expr_col("tag2", &schema), + options: Default::default(), + }, + ]; + + insta::assert_yaml_snapshot!( + ProjectOutputOrdering::new(&schema, output_ordering, projection), + @r###" + --- + output_ordering: + - tag1@0 + - tag2@1 + projection: + - tag1 + - tag2 + projected_ordering: + - tag1@0 + - tag2@1 + "### + ); + } + + #[test] + fn test_project_output_ordering_project_prefix() { + let schema = schema(); + let projection = vec!["tag1"]; // prefix of the sort key + let output_ordering = vec![ + PhysicalSortExpr { + expr: expr_col("tag1", &schema), + options: Default::default(), + }, + PhysicalSortExpr { + expr: expr_col("tag2", &schema), + options: Default::default(), + }, + ]; + + insta::assert_yaml_snapshot!( + ProjectOutputOrdering::new(&schema, output_ordering, projection), + @r###" + --- + output_ordering: + - tag1@0 + - tag2@1 + projection: + - tag1 + projected_ordering: + - tag1@0 + "### + ); + } + + #[test] + fn test_project_output_ordering_project_non_prefix() { + let schema = schema(); + let projection = vec!["tag2"]; // in sort key, but not prefix + let output_ordering = vec![ + PhysicalSortExpr { + expr: expr_col("tag1", &schema), + options: Default::default(), + }, + PhysicalSortExpr { + expr: expr_col("tag2", &schema), + options: Default::default(), + }, + ]; + + insta::assert_yaml_snapshot!( + ProjectOutputOrdering::new(&schema, output_ordering, projection), + @r###" + --- + output_ordering: + - tag1@0 + - tag2@1 + projection: + - tag2 + projected_ordering: [] + "### + ); + } + + #[test] + fn test_project_output_ordering_projection_reorder() { + let schema = schema(); + let projection = vec!["tag2", "tag1", "field"]; // in different order than sort key + let output_ordering = vec![ + PhysicalSortExpr { + expr: expr_col("tag1", &schema), + options: Default::default(), + }, + PhysicalSortExpr { + expr: expr_col("tag2", &schema), + options: Default::default(), + }, + ]; + + insta::assert_yaml_snapshot!( + ProjectOutputOrdering::new(&schema, output_ordering, projection), + @r###" + --- + output_ordering: + - tag1@0 + - tag2@1 + projection: + - tag2 + - tag1 + - field + projected_ordering: + - tag1@1 + - tag2@0 + "### + ); + } + + #[test] + fn test_project_output_ordering_constant() { + let schema = schema(); + let projection = vec!["tag2"]; + let output_ordering = vec![ + // ordering by a constant is ignored + PhysicalSortExpr { + expr: datafusion::physical_plan::expressions::lit(1), + options: Default::default(), + }, + PhysicalSortExpr { + expr: expr_col("tag2", &schema), + options: Default::default(), + }, + ]; + + insta::assert_yaml_snapshot!( + ProjectOutputOrdering::new(&schema, output_ordering, projection), + @r###" + --- + output_ordering: + - "1" + - tag2@1 + projection: + - tag2 + projected_ordering: [] + "### + ); + } + + #[test] + fn test_project_output_ordering_constant_second_position() { + let schema = schema(); + let projection = vec!["tag2"]; + let output_ordering = vec![ + PhysicalSortExpr { + expr: expr_col("tag2", &schema), + options: Default::default(), + }, + // ordering by a constant is ignored + PhysicalSortExpr { + expr: datafusion::physical_plan::expressions::lit(1), + options: Default::default(), + }, + ]; + + insta::assert_yaml_snapshot!( + ProjectOutputOrdering::new(&schema, output_ordering, projection), + @r###" + --- + output_ordering: + - tag2@1 + - "1" + projection: + - tag2 + projected_ordering: + - tag2@0 + "### + ); + } + + /// project the output_ordering with the projection, + // derive serde to make a nice 'insta' snapshot + #[derive(Debug, Serialize)] + struct ProjectOutputOrdering { + output_ordering: Vec, + projection: Vec, + projected_ordering: Vec, + } + + impl ProjectOutputOrdering { + fn new( + schema: &Schema, + output_ordering: Vec, + projection: Vec<&'static str>, + ) -> Self { + let projected_fields: Fields = projection + .iter() + .map(|field_name| { + schema + .field_with_name(field_name) + .expect("finding field") + .clone() + }) + .collect(); + let projected_schema = Arc::new(Schema::new(projected_fields)); + + let projected_ordering = project_output_ordering(&output_ordering, projected_schema); + + let projected_ordering = match projected_ordering { + Ok(projected_ordering) => format_sort_exprs(&projected_ordering), + Err(e) => vec![e.to_string()], + }; + + Self { + output_ordering: format_sort_exprs(&output_ordering), + projection: projection.iter().map(|s| s.to_string()).collect(), + projected_ordering, + } + } + } + fn schema() -> SchemaRef { Arc::new(Schema::new(vec![ Field::new("tag1", DataType::Utf8, true), @@ -1357,6 +1616,16 @@ mod tests { ])) } + fn format_sort_exprs(sort_exprs: &[PhysicalSortExpr]) -> Vec { + sort_exprs + .iter() + .map(|expr| { + let PhysicalSortExpr { expr, options: _ } = expr; + expr.to_string() + }) + .collect::>() + } + fn expr_col(name: &str, schema: &SchemaRef) -> Arc { Arc::new(Column::new_with_schema(name, schema).unwrap()) }