diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index b5664cb783..e3a6c73ef3 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -1362,10 +1362,18 @@ impl ChunkOrder { /// The minimum ordering value a chunk could have. Currently only used in testing. pub const MIN: Self = Self(0); + /// The maximum chunk order. + pub const MAX: Self = Self(i64::MAX); + /// Create a ChunkOrder from the given value. pub fn new(order: i64) -> Self { Self(order) } + + /// Under underlying order as integer. + pub fn get(&self) -> i64 { + self.0 + } } /// `PartitionTemplate` is used to compute the partition key of each row that diff --git a/ingester/src/buffer_tree/partition.rs b/ingester/src/buffer_tree/partition.rs index 7b2be1b12a..b9219f9d0d 100644 --- a/ingester/src/buffer_tree/partition.rs +++ b/ingester/src/buffer_tree/partition.rs @@ -772,7 +772,9 @@ mod tests { let input = Arc::new(MemoryExec::try_new(&[batch], schema, projection).unwrap()); // Create and run the deduplicator - let exec = Arc::new(iox_query::provider::DeduplicateExec::new(input, sort_keys)); + let exec = Arc::new(iox_query::provider::DeduplicateExec::new( + input, sort_keys, false, + )); let got = test_collect(Arc::clone(&exec) as Arc).await; assert_batches_eq!(expect, &*got); diff --git a/ingester/src/query_adaptor.rs b/ingester/src/query_adaptor.rs index babb281aab..56adeadd90 100644 --- a/ingester/src/query_adaptor.rs +++ b/ingester/src/query_adaptor.rs @@ -222,7 +222,7 @@ impl QueryChunk for QueryAdaptor { } fn order(&self) -> ChunkOrder { - unimplemented!() + ChunkOrder::MAX } fn as_any(&self) -> &dyn Any { diff --git a/ingester2/src/buffer_tree/partition.rs b/ingester2/src/buffer_tree/partition.rs index cde6235c8d..04fc4ab4c9 100644 --- a/ingester2/src/buffer_tree/partition.rs +++ b/ingester2/src/buffer_tree/partition.rs @@ -604,7 +604,7 @@ mod tests { let input = Arc::new(MemoryExec::try_new(&[batch], schema, projection).unwrap()); // Create and run the deduplicator - let exec = Arc::new(iox_query::provider::DeduplicateExec::new(input, sort_keys)); + let exec = Arc::new(iox_query::provider::DeduplicateExec::new(input, sort_keys, false)); let got = test_collect(Arc::clone(&exec) as Arc).await; assert_batches_eq!(expect, &*got); diff --git a/ingester2/src/query_adaptor.rs b/ingester2/src/query_adaptor.rs index 38e1645391..44b32d43ca 100644 --- a/ingester2/src/query_adaptor.rs +++ b/ingester2/src/query_adaptor.rs @@ -201,7 +201,7 @@ impl QueryChunk for QueryAdaptor { } fn order(&self) -> ChunkOrder { - unimplemented!() + ChunkOrder::MAX } fn as_any(&self) -> &dyn Any { diff --git a/iox_query/src/lib.rs b/iox_query/src/lib.rs index bd79dab538..44f60dd635 100644 --- a/iox_query/src/lib.rs +++ b/iox_query/src/lib.rs @@ -10,7 +10,10 @@ clippy::dbg_macro )] -use arrow::record_batch::RecordBatch; +use arrow::{ + datatypes::{DataType, Field}, + record_batch::RecordBatch, +}; use async_trait::async_trait; use data_types::{ChunkId, ChunkOrder, DeletePredicate, InfluxDbType, PartitionId, TableSummary}; use datafusion::{error::DataFusionError, prelude::SessionContext}; @@ -39,6 +42,14 @@ pub mod util; pub use frontend::common::ScanPlanBuilder; pub use query_functions::group_by::{Aggregate, WindowDuration}; +/// The name of the virtual column that represents the chunk order. +pub const CHUNK_ORDER_COLUMN_NAME: &str = "__chunk_order"; + +/// Generate [`Field`] for [chunk order column](CHUNK_ORDER_COLUMN_NAME). +pub fn chunk_order_field() -> Field { + Field::new(CHUNK_ORDER_COLUMN_NAME, DataType::Int64, false) +} + /// Trait for an object (designed to be a Chunk) which can provide /// metadata pub trait QueryChunkMeta { diff --git a/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs b/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs index 3f0d733a79..3d36ed7819 100644 --- a/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs +++ b/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs @@ -73,7 +73,11 @@ impl PhysicalOptimizerRule for DedupNullColumns { ); let sort_exprs = arrow_sort_key_exprs(&sort_key, &schema); - return Ok(Some(Arc::new(DeduplicateExec::new(child, sort_exprs)))); + return Ok(Some(Arc::new(DeduplicateExec::new( + child, + sort_exprs, + dedup_exec.use_chunk_order_col(), + )))); } Ok(None) @@ -95,7 +99,7 @@ mod tests { use crate::{ physical_optimizer::{ - dedup::test_util::{chunk, dedup_plan}, + dedup::test_util::{chunk, dedup_plan, dedup_plan_with_chunk_order_col}, test_util::OptimizationTest, }, test::TestChunk, @@ -147,6 +151,29 @@ mod tests { ); } + #[test] + fn test_single_chunk_schema_has_chunk_order_col() { + let chunk = chunk(1).with_dummy_parquet_file(); + let schema = chunk.schema().clone(); + let plan = dedup_plan_with_chunk_order_col(schema, vec![chunk]); + let opt = DedupNullColumns::default(); + insta::assert_yaml_snapshot!( + OptimizationTest::new(plan, opt), + @r###" + --- + input: + - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" + - " UnionExec" + - " ParquetExec: limit=None, partitions={1 group: [[1.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + output: + Ok: + - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" + - " UnionExec" + - " ParquetExec: limit=None, partitions={1 group: [[1.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + "### + ); + } + #[test] fn test_single_chunk_misses_pk_cols() { let chunk = TestChunk::new("table") diff --git a/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs b/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs index ee58dae969..87f713b0ba 100644 --- a/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs +++ b/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs @@ -15,6 +15,7 @@ use crate::{ physical_optimizer::chunk_extraction::extract_chunks, provider::{chunks_to_physical_nodes, DeduplicateExec}, util::arrow_sort_key_exprs, + CHUNK_ORDER_COLUMN_NAME, }; /// Determine sort key order of [`DeduplicateExec`]. @@ -80,6 +81,7 @@ impl PhysicalOptimizerRule for DedupSortOrder { let mut quorum_sort_key_builder = SortKeyBuilder::default(); let mut todo_pk_columns = dedup_exec.sort_columns(); + todo_pk_columns.remove(CHUNK_ORDER_COLUMN_NAME); while !todo_pk_columns.is_empty() { let candidate_counts = todo_pk_columns.iter().copied().map(|col| { let count = chunk_sort_keys @@ -133,7 +135,11 @@ impl PhysicalOptimizerRule for DedupSortOrder { ); let sort_exprs = arrow_sort_key_exprs(&quorum_sort_key, &schema); - return Ok(Some(Arc::new(DeduplicateExec::new(child, sort_exprs)))); + return Ok(Some(Arc::new(DeduplicateExec::new( + child, + sort_exprs, + dedup_exec.use_chunk_order_col(), + )))); } Ok(None) @@ -165,7 +171,7 @@ mod tests { use crate::{ physical_optimizer::{ - dedup::test_util::{chunk, dedup_plan}, + dedup::test_util::{chunk, dedup_plan, dedup_plan_with_chunk_order_col}, test_util::OptimizationTest, }, test::TestChunk, @@ -246,6 +252,35 @@ mod tests { ); } + #[test] + fn test_single_chunk_with_chunk_order_col() { + let chunk = chunk(1) + .with_dummy_parquet_file() + .with_sort_key(SortKey::from_columns([ + Arc::from("tag2"), + Arc::from("tag1"), + Arc::from(TIME_COLUMN_NAME), + ])); + let schema = chunk.schema().clone(); + let plan = dedup_plan_with_chunk_order_col(schema, vec![chunk]); + let opt = DedupSortOrder::default(); + insta::assert_yaml_snapshot!( + OptimizationTest::new(plan, opt), + @r###" + --- + input: + - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" + - " UnionExec" + - " ParquetExec: limit=None, partitions={1 group: [[1.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + output: + Ok: + - " DeduplicateExec: [tag2@2 ASC,tag1@1 ASC,time@3 ASC]" + - " UnionExec" + - " ParquetExec: limit=None, partitions={1 group: [[1.parquet]]}, output_ordering=[tag2@2 ASC, tag1@1 ASC, time@3 ASC, __chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + "### + ); + } + #[test] fn test_unusual_time_order() { let chunk = chunk(1) diff --git a/iox_query/src/physical_optimizer/dedup/partition_split.rs b/iox_query/src/physical_optimizer/dedup/partition_split.rs index 9817b69a88..cbc562e6f0 100644 --- a/iox_query/src/physical_optimizer/dedup/partition_split.rs +++ b/iox_query/src/physical_optimizer/dedup/partition_split.rs @@ -89,6 +89,7 @@ impl PhysicalOptimizerRule for PartitionSplit { config.execution.target_partitions, ), dedup_exec.sort_keys().to_vec(), + dedup_exec.use_chunk_order_col(), )) as _ }) .collect(), diff --git a/iox_query/src/physical_optimizer/dedup/test_util.rs b/iox_query/src/physical_optimizer/dedup/test_util.rs index 25fff9131a..10b8e623bd 100644 --- a/iox_query/src/physical_optimizer/dedup/test_util.rs +++ b/iox_query/src/physical_optimizer/dedup/test_util.rs @@ -1,10 +1,12 @@ use std::sync::Arc; +use arrow::datatypes::Schema as ArrowSchema; use datafusion::physical_plan::ExecutionPlan; use predicate::Predicate; use schema::Schema; use crate::{ + chunk_order_field, provider::{chunks_to_physical_nodes, DeduplicateExec}, test::TestChunk, util::arrow_sort_key_exprs, @@ -12,15 +14,43 @@ use crate::{ }; pub fn dedup_plan(schema: Schema, chunks: Vec) -> Arc { + dedup_plan_impl(schema, chunks, false) +} + +pub fn dedup_plan_with_chunk_order_col( + schema: Schema, + chunks: Vec, +) -> Arc { + dedup_plan_impl(schema, chunks, true) +} + +fn dedup_plan_impl( + schema: Schema, + chunks: Vec, + use_chunk_order_col: bool, +) -> Arc { let chunks = chunks .into_iter() .map(|c| Arc::new(c) as _) .collect::>>(); - let plan = chunks_to_physical_nodes(&schema.as_arrow(), None, chunks, Predicate::new(), 2); + let arrow_schema = if use_chunk_order_col { + Arc::new(ArrowSchema::new( + schema + .as_arrow() + .fields + .iter() + .cloned() + .chain(std::iter::once(chunk_order_field())) + .collect(), + )) + } else { + schema.as_arrow() + }; + let plan = chunks_to_physical_nodes(&arrow_schema, None, chunks, Predicate::new(), 2); let sort_key = schema::sort::SortKey::from_columns(schema.primary_key()); - let sort_exprs = arrow_sort_key_exprs(&sort_key, &schema.as_arrow()); - Arc::new(DeduplicateExec::new(plan, sort_exprs)) + let sort_exprs = arrow_sort_key_exprs(&sort_key, &plan.schema()); + Arc::new(DeduplicateExec::new(plan, sort_exprs, use_chunk_order_col)) } pub fn chunk(id: u128) -> TestChunk { diff --git a/iox_query/src/physical_optimizer/dedup/time_split.rs b/iox_query/src/physical_optimizer/dedup/time_split.rs index 6f83c512e6..61f9aa4022 100644 --- a/iox_query/src/physical_optimizer/dedup/time_split.rs +++ b/iox_query/src/physical_optimizer/dedup/time_split.rs @@ -76,6 +76,7 @@ impl PhysicalOptimizerRule for TimeSplit { config.execution.target_partitions, ), dedup_exec.sort_keys().to_vec(), + dedup_exec.use_chunk_order_col(), )) as _ }) .collect(), diff --git a/iox_query/src/physical_optimizer/predicate_pushdown.rs b/iox_query/src/physical_optimizer/predicate_pushdown.rs index f86ffe67e9..0386a8f337 100644 --- a/iox_query/src/physical_optimizer/predicate_pushdown.rs +++ b/iox_query/src/physical_optimizer/predicate_pushdown.rs @@ -104,6 +104,7 @@ impl PhysicalOptimizerRule for PredicatePushdown { grandchild, )?), child_dedup.sort_keys().to_vec(), + child_dedup.use_chunk_order_col(), )); if !no_pushdown.is_empty() { new_node = Arc::new(FilterExec::try_new( @@ -354,6 +355,7 @@ mod tests { Arc::new(DeduplicateExec::new( Arc::new(EmptyExec::new(true, Arc::clone(&schema))), sort_expr(&schema), + false, )), ) .unwrap(), @@ -385,6 +387,7 @@ mod tests { Arc::new(DeduplicateExec::new( Arc::new(EmptyExec::new(true, Arc::clone(&schema))), sort_expr(&schema), + false, )), ) .unwrap(), @@ -423,6 +426,7 @@ mod tests { Arc::new(DeduplicateExec::new( Arc::new(EmptyExec::new(true, Arc::clone(&schema))), sort_expr(&schema), + false, )), ) .unwrap(), diff --git a/iox_query/src/physical_optimizer/projection_pushdown.rs b/iox_query/src/physical_optimizer/projection_pushdown.rs index 5a5aa357a8..a465d6af05 100644 --- a/iox_query/src/physical_optimizer/projection_pushdown.rs +++ b/iox_query/src/physical_optimizer/projection_pushdown.rs @@ -208,7 +208,11 @@ impl PhysicalOptimizerRule for ProjectionPushdown { child_dedup.sort_keys(), &plan.schema(), )?; - Ok(Arc::new(DeduplicateExec::new(plan, sort_keys))) + Ok(Arc::new(DeduplicateExec::new( + plan, + sort_keys, + child_dedup.use_chunk_order_col(), + ))) }, )?; @@ -981,6 +985,7 @@ mod tests { ..Default::default() }, }], + false, )), ) .unwrap(), @@ -1036,6 +1041,7 @@ mod tests { }, }, ], + false, )), ) .unwrap(), @@ -1135,6 +1141,7 @@ mod tests { options: Default::default(), }, ], + false, )); let plan = Arc::new(FilterExec::try_new(expr_string_cmp("tag2", &plan.schema()), plan).unwrap()); diff --git a/iox_query/src/provider.rs b/iox_query/src/provider.rs index e75e619dab..726a3f9df5 100644 --- a/iox_query/src/provider.rs +++ b/iox_query/src/provider.rs @@ -941,7 +941,7 @@ impl Deduplicater { sort_exprs: Vec, input: Arc, ) -> Arc { - Arc::new(DeduplicateExec::new(input, sort_exprs)) + Arc::new(DeduplicateExec::new(input, sort_exprs, false)) } /// Creates a plan that produces output_schema given a plan that diff --git a/iox_query/src/provider/deduplicate.rs b/iox_query/src/provider/deduplicate.rs index 9f020ea6f8..71a4a108fd 100644 --- a/iox_query/src/provider/deduplicate.rs +++ b/iox_query/src/provider/deduplicate.rs @@ -7,13 +7,15 @@ use std::{collections::HashSet, fmt, sync::Arc}; use arrow::{error::ArrowError, record_batch::RecordBatch}; use datafusion_util::{watch::WatchedTask, AdapterStream}; +use crate::CHUNK_ORDER_COLUMN_NAME; + use self::algo::get_col_name; pub use self::algo::RecordBatchDeduplicator; use datafusion::{ error::{DataFusionError, Result}, execution::context::TaskContext, physical_plan::{ - expressions::PhysicalSortExpr, + expressions::{Column, PhysicalSortExpr}, metrics::{ self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, RecordOutput, }, @@ -108,15 +110,33 @@ use tokio::sync::mpsc; pub struct DeduplicateExec { input: Arc, sort_keys: Vec, + input_order: Vec, + use_chunk_order_col: bool, /// Execution metrics metrics: ExecutionPlanMetricsSet, } impl DeduplicateExec { - pub fn new(input: Arc, sort_keys: Vec) -> Self { + pub fn new( + input: Arc, + sort_keys: Vec, + use_chunk_order_col: bool, + ) -> Self { + let mut input_order = sort_keys.clone(); + if use_chunk_order_col { + input_order.push(PhysicalSortExpr { + expr: Arc::new( + Column::new_with_schema(CHUNK_ORDER_COLUMN_NAME, &input.schema()) + .expect("input has chunk order col"), + ), + options: Default::default(), + }) + } Self { input, sort_keys, + input_order, + use_chunk_order_col, metrics: ExecutionPlanMetricsSet::new(), } } @@ -125,12 +145,17 @@ impl DeduplicateExec { &self.sort_keys } + /// Combination of all columns within the sort key and potentially the chunk order column. pub fn sort_columns(&self) -> HashSet<&str> { - self.sort_keys + self.input_order .iter() .map(|sk| get_col_name(sk.expr.as_ref())) .collect() } + + pub fn use_chunk_order_col(&self) -> bool { + self.use_chunk_order_col + } } #[derive(Debug)] @@ -166,8 +191,7 @@ impl ExecutionPlan for DeduplicateExec { } fn required_input_ordering(&self) -> Vec> { - // requires the input to be sorted on the primary key - vec![self.output_ordering()] + vec![Some(&self.input_order)] } fn maintains_input_order(&self) -> Vec { @@ -188,7 +212,11 @@ impl ExecutionPlan for DeduplicateExec { ) -> datafusion::error::Result> { assert_eq!(children.len(), 1); let input = Arc::clone(&children[0]); - Ok(Arc::new(Self::new(input, self.sort_keys.clone()))) + Ok(Arc::new(Self::new( + input, + self.sort_keys.clone(), + self.use_chunk_order_col, + ))) } fn execute( @@ -987,7 +1015,7 @@ mod test { }, }]; - let exec: Arc = Arc::new(DeduplicateExec::new(input, sort_keys)); + let exec: Arc = Arc::new(DeduplicateExec::new(input, sort_keys, false)); test_collect(exec).await; } @@ -1109,7 +1137,7 @@ mod test { let input = Arc::new(MemoryExec::try_new(&[input], schema, projection).unwrap()); // Create and run the deduplicator - let exec = Arc::new(DeduplicateExec::new(input, sort_keys)); + let exec = Arc::new(DeduplicateExec::new(input, sort_keys, false)); let output = test_collect(Arc::clone(&exec) as Arc).await; TestResults { output, exec } diff --git a/iox_query/src/provider/physical.rs b/iox_query/src/provider/physical.rs index 373e303c4d..7793cb145b 100644 --- a/iox_query/src/provider/physical.rs +++ b/iox_query/src/provider/physical.rs @@ -2,18 +2,20 @@ use crate::{ provider::record_batch_exec::RecordBatchesExec, util::arrow_sort_key_exprs, QueryChunk, - QueryChunkData, + QueryChunkData, CHUNK_ORDER_COLUMN_NAME, }; -use arrow::datatypes::SchemaRef; +use arrow::datatypes::{DataType, Schema as ArrowSchema, SchemaRef}; use datafusion::{ datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}, - physical_expr::execution_props::ExecutionProps, + physical_expr::{execution_props::ExecutionProps, PhysicalSortExpr}, physical_plan::{ empty::EmptyExec, + expressions::Column, file_format::{FileScanConfig, ParquetExec}, union::UnionExec, - ExecutionPlan, Statistics, + ColumnStatistics, ExecutionPlan, Statistics, }, + scalar::ScalarValue, }; use datafusion_util::create_physical_expr_from_schema; use object_store::ObjectMeta; @@ -187,25 +189,49 @@ pub fn chunks_to_physical_nodes( } let mut parquet_chunks: Vec<_> = parquet_chunks.into_iter().collect(); parquet_chunks.sort_by_key(|(url_str, _)| url_str.clone()); + let has_chunk_order_col = schema.field_with_name(CHUNK_ORDER_COLUMN_NAME).is_ok(); for (_url_str, chunk_list) in parquet_chunks { let ParquetChunkList { object_store_url, - chunks, + mut chunks, sort_key, } = chunk_list; + // ensure that chunks are actually ordered by chunk order + chunks.sort_by_key(|(_meta, c)| c.order()); + + let num_rows = chunks + .iter() + .map(|(_meta, c)| c.summary().total_count() as usize) + .sum::(); + let chunk_order_min = chunks + .iter() + .map(|(_meta, c)| c.order().get()) + .min() + .expect("at least one chunk"); + let chunk_order_max = chunks + .iter() + .map(|(_meta, c)| c.order().get()) + .max() + .expect("at least one chunk"); + let file_groups = distribute( - chunks - .into_iter() - .map(|(object_meta, chunk)| PartitionedFile { + chunks.into_iter().map(|(object_meta, chunk)| { + let partition_values = if has_chunk_order_col { + vec![ScalarValue::from(chunk.order().get())] + } else { + vec![] + }; + PartitionedFile { object_meta, - partition_values: vec![], + partition_values, range: None, extensions: Some(Arc::new(PartitionedFileExt { chunk, output_sort_key_memo: output_sort_key.cloned(), })), - }), + } + }), target_partitions, ); @@ -224,14 +250,76 @@ pub fn chunks_to_physical_nodes( } }); + let (table_partition_cols, file_schema, output_ordering) = if has_chunk_order_col { + let table_partition_cols = vec![(CHUNK_ORDER_COLUMN_NAME.to_owned(), DataType::Int64)]; + let file_schema = Arc::new(ArrowSchema::new( + schema + .fields + .iter() + .filter(|f| f.name() != CHUNK_ORDER_COLUMN_NAME) + .cloned() + .collect(), + )); + let output_ordering = Some( + output_ordering + .unwrap_or_default() + .into_iter() + .chain(std::iter::once(PhysicalSortExpr { + expr: Arc::new( + Column::new_with_schema(CHUNK_ORDER_COLUMN_NAME, schema) + .expect("just added col"), + ), + options: Default::default(), + })) + .collect::>(), + ); + (table_partition_cols, file_schema, output_ordering) + } else { + (vec![], Arc::clone(schema), output_ordering) + }; + + let statistics = Statistics { + num_rows: Some(num_rows), + total_byte_size: None, + column_statistics: Some( + schema + .fields + .iter() + .map(|f| { + let null_count = if f.is_nullable() { None } else { Some(0) }; + + let (min_value, max_value) = if f.name() == CHUNK_ORDER_COLUMN_NAME { + ( + Some(ScalarValue::from(chunk_order_min)), + Some(ScalarValue::from(chunk_order_max)), + ) + } else { + (None, None) + }; + + ColumnStatistics { + null_count, + min_value, + max_value, + distinct_count: None, + } + }) + .collect(), + ), + + // this does NOT account for predicate pushdown + // Also see https://github.com/apache/arrow-datafusion/issues/5614 + is_exact: false, + }; + let base_config = FileScanConfig { object_store_url, - file_schema: Arc::clone(schema), + file_schema, file_groups, - statistics: Statistics::default(), + statistics, projection: None, limit: None, - table_partition_cols: vec![], + table_partition_cols, output_ordering, infinite_source: false, }; @@ -271,6 +359,7 @@ mod tests { use schema::{sort::SortKeyBuilder, SchemaBuilder, TIME_COLUMN_NAME}; use crate::{ + chunk_order_field, test::{format_execution_plan, TestChunk}, QueryChunkMeta, }; @@ -482,4 +571,38 @@ mod tests { "### ); } + + #[test] + fn test_chunks_to_physical_nodes_mixed_with_chunk_order() { + let chunk1 = TestChunk::new("table") + .with_tag_column("tag") + .with_dummy_parquet_file(); + let chunk2 = TestChunk::new("table").with_tag_column("tag"); + let schema = Arc::new(ArrowSchema::new( + chunk1 + .schema() + .as_arrow() + .fields + .iter() + .cloned() + .chain(std::iter::once(chunk_order_field())) + .collect(), + )); + let plan = chunks_to_physical_nodes( + &schema, + None, + vec![Arc::new(chunk1), Arc::new(chunk2)], + Predicate::new(), + 2, + ); + insta::assert_yaml_snapshot!( + format_execution_plan(&plan), + @r###" + --- + - " UnionExec" + - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " ParquetExec: limit=None, partitions={1 group: [[0.parquet]]}, output_ordering=[__chunk_order@1 ASC], projection=[tag, __chunk_order]" + "### + ); + } } diff --git a/iox_query/src/provider/record_batch_exec.rs b/iox_query/src/provider/record_batch_exec.rs index 3cad6f4cd8..5d1a94ba3e 100644 --- a/iox_query/src/provider/record_batch_exec.rs +++ b/iox_query/src/provider/record_batch_exec.rs @@ -1,23 +1,30 @@ //! Implementation of a DataFusion PhysicalPlan node across partition chunks -use crate::QueryChunk; +use crate::{QueryChunk, CHUNK_ORDER_COLUMN_NAME}; use super::adapter::SchemaAdapterStream; use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; -use data_types::TableSummary; +use data_types::{ColumnSummary, InfluxDbType, TableSummary}; use datafusion::{ error::DataFusionError, execution::context::TaskContext, physical_plan::{ - expressions::PhysicalSortExpr, + expressions::{Column, PhysicalSortExpr}, memory::MemoryStream, metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }, + scalar::ScalarValue, }; use observability_deps::tracing::trace; use schema::sort::SortKey; -use std::{collections::HashSet, fmt, sync::Arc}; +use std::{ + borrow::Cow, + collections::{HashMap, HashSet}, + fmt, + num::NonZeroU64, + sync::Arc, +}; /// Implements the DataFusion physical plan interface for [`RecordBatch`]es with automatic projection and NULL-column creation. #[derive(Debug)] @@ -41,6 +48,9 @@ pub(crate) struct RecordBatchesExec { /// /// [`chunks_to_physical_nodes`]: super::physical::chunks_to_physical_nodes output_sort_key_memo: Option, + + /// Output ordering. + output_ordering: Option>, } impl RecordBatchesExec { @@ -49,7 +59,7 @@ impl RecordBatchesExec { schema: SchemaRef, output_sort_key_memo: Option, ) -> Self { - let mut combined_summary_option: Option = None; + let has_chunk_order_col = schema.field_with_name(CHUNK_ORDER_COLUMN_NAME).is_ok(); let chunks: Vec<_> = chunks .into_iter() @@ -58,29 +68,81 @@ impl RecordBatchesExec { .data() .into_record_batches() .expect("chunk must have record batches"); - let summary = chunk.summary(); - match combined_summary_option.as_mut() { - None => { - combined_summary_option = Some(summary.as_ref().clone()); - } - Some(combined_summary) => { - combined_summary.update_from(&summary); - } - } (chunk, batches) }) .collect(); - let statistics = combined_summary_option + let statistics = chunks + .iter() + .fold( + None, + |mut combined_summary: Option, (chunk, _batches)| { + let summary = chunk.summary(); + + let summary = if has_chunk_order_col { + // add chunk order column + let order = chunk.order().get(); + let summary = TableSummary { + columns: summary + .columns + .iter() + .cloned() + .chain(std::iter::once(ColumnSummary { + name: CHUNK_ORDER_COLUMN_NAME.to_owned(), + influxdb_type: InfluxDbType::Field, + stats: data_types::Statistics::I64(data_types::StatValues { + min: Some(order), + max: Some(order), + total_count: summary.total_count(), + null_count: Some(0), + distinct_count: Some(NonZeroU64::new(1).unwrap()), + }), + })) + .collect(), + }; + + Cow::Owned(summary) + } else { + Cow::Borrowed(summary.as_ref()) + }; + + match combined_summary.as_mut() { + None => { + combined_summary = Some(summary.into_owned()); + } + Some(combined_summary) => { + combined_summary.update_from(&summary); + } + } + + combined_summary + }, + ) .map(|combined_summary| crate::statistics::df_from_iox(&schema, &combined_summary)) .unwrap_or_default(); + let output_ordering = if has_chunk_order_col { + Some(vec![ + // every chunk gets its own partition, so we can claim that the output is ordered + PhysicalSortExpr { + expr: Arc::new( + Column::new_with_schema(CHUNK_ORDER_COLUMN_NAME, &schema) + .expect("just checked presence of chunk order col"), + ), + options: Default::default(), + }, + ]) + } else { + None + }; + Self { chunks, schema, statistics, output_sort_key_memo, + output_ordering, metrics: ExecutionPlanMetricsSet::new(), } } @@ -115,8 +177,7 @@ impl ExecutionPlan for RecordBatchesExec { } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - // TODO ?? - None + self.output_ordering.as_deref() } fn children(&self) -> Vec> { @@ -175,8 +236,12 @@ impl ExecutionPlan for RecordBatchesExec { incomplete_output_schema, projection, )?); + let virtual_columns = HashMap::from([( + CHUNK_ORDER_COLUMN_NAME, + ScalarValue::from(chunk.order().get()), + )]); let adapter = Box::pin( - SchemaAdapterStream::try_new(stream, schema, &Default::default(), baseline_metrics) + SchemaAdapterStream::try_new(stream, schema, &virtual_columns, baseline_metrics) .map_err(|e| DataFusionError::Internal(e.to_string()))?, );