feat: virtual chunk order col (#7240)

* feat: introduce `CHUNK_ORDER_COLUMN_NAME`

* feat: impl `ChunkOrder` everywhere

* feat: `ChunkOrder::get`

* feat: emit chunk order column for `RecordBatchesExec`

* feat: `chunk_order_field`

* feat: chunk order col for parquet chunks

* feat: optional chunk order col handling for dedup

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-03-17 10:39:21 +01:00 committed by GitHub
parent 982e1ab96c
commit 20ec47b00b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 395 additions and 53 deletions

View File

@ -1362,10 +1362,18 @@ impl ChunkOrder {
/// The minimum ordering value a chunk could have. Currently only used in testing.
pub const MIN: Self = Self(0);
/// The maximum chunk order.
pub const MAX: Self = Self(i64::MAX);
/// Create a ChunkOrder from the given value.
pub fn new(order: i64) -> Self {
Self(order)
}
/// Under underlying order as integer.
pub fn get(&self) -> i64 {
self.0
}
}
/// `PartitionTemplate` is used to compute the partition key of each row that

View File

@ -772,7 +772,9 @@ mod tests {
let input = Arc::new(MemoryExec::try_new(&[batch], schema, projection).unwrap());
// Create and run the deduplicator
let exec = Arc::new(iox_query::provider::DeduplicateExec::new(input, sort_keys));
let exec = Arc::new(iox_query::provider::DeduplicateExec::new(
input, sort_keys, false,
));
let got = test_collect(Arc::clone(&exec) as Arc<dyn ExecutionPlan>).await;
assert_batches_eq!(expect, &*got);

View File

@ -222,7 +222,7 @@ impl QueryChunk for QueryAdaptor {
}
fn order(&self) -> ChunkOrder {
unimplemented!()
ChunkOrder::MAX
}
fn as_any(&self) -> &dyn Any {

View File

@ -604,7 +604,7 @@ mod tests {
let input = Arc::new(MemoryExec::try_new(&[batch], schema, projection).unwrap());
// Create and run the deduplicator
let exec = Arc::new(iox_query::provider::DeduplicateExec::new(input, sort_keys));
let exec = Arc::new(iox_query::provider::DeduplicateExec::new(input, sort_keys, false));
let got = test_collect(Arc::clone(&exec) as Arc<dyn ExecutionPlan>).await;
assert_batches_eq!(expect, &*got);

View File

@ -201,7 +201,7 @@ impl QueryChunk for QueryAdaptor {
}
fn order(&self) -> ChunkOrder {
unimplemented!()
ChunkOrder::MAX
}
fn as_any(&self) -> &dyn Any {

View File

@ -10,7 +10,10 @@
clippy::dbg_macro
)]
use arrow::record_batch::RecordBatch;
use arrow::{
datatypes::{DataType, Field},
record_batch::RecordBatch,
};
use async_trait::async_trait;
use data_types::{ChunkId, ChunkOrder, DeletePredicate, InfluxDbType, PartitionId, TableSummary};
use datafusion::{error::DataFusionError, prelude::SessionContext};
@ -39,6 +42,14 @@ pub mod util;
pub use frontend::common::ScanPlanBuilder;
pub use query_functions::group_by::{Aggregate, WindowDuration};
/// The name of the virtual column that represents the chunk order.
pub const CHUNK_ORDER_COLUMN_NAME: &str = "__chunk_order";
/// Generate [`Field`] for [chunk order column](CHUNK_ORDER_COLUMN_NAME).
pub fn chunk_order_field() -> Field {
Field::new(CHUNK_ORDER_COLUMN_NAME, DataType::Int64, false)
}
/// Trait for an object (designed to be a Chunk) which can provide
/// metadata
pub trait QueryChunkMeta {

View File

@ -73,7 +73,11 @@ impl PhysicalOptimizerRule for DedupNullColumns {
);
let sort_exprs = arrow_sort_key_exprs(&sort_key, &schema);
return Ok(Some(Arc::new(DeduplicateExec::new(child, sort_exprs))));
return Ok(Some(Arc::new(DeduplicateExec::new(
child,
sort_exprs,
dedup_exec.use_chunk_order_col(),
))));
}
Ok(None)
@ -95,7 +99,7 @@ mod tests {
use crate::{
physical_optimizer::{
dedup::test_util::{chunk, dedup_plan},
dedup::test_util::{chunk, dedup_plan, dedup_plan_with_chunk_order_col},
test_util::OptimizationTest,
},
test::TestChunk,
@ -147,6 +151,29 @@ mod tests {
);
}
#[test]
fn test_single_chunk_schema_has_chunk_order_col() {
let chunk = chunk(1).with_dummy_parquet_file();
let schema = chunk.schema().clone();
let plan = dedup_plan_with_chunk_order_col(schema, vec![chunk]);
let opt = DedupNullColumns::default();
insta::assert_yaml_snapshot!(
OptimizationTest::new(plan, opt),
@r###"
---
input:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " ParquetExec: limit=None, partitions={1 group: [[1.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
output:
Ok:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " ParquetExec: limit=None, partitions={1 group: [[1.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
"###
);
}
#[test]
fn test_single_chunk_misses_pk_cols() {
let chunk = TestChunk::new("table")

View File

@ -15,6 +15,7 @@ use crate::{
physical_optimizer::chunk_extraction::extract_chunks,
provider::{chunks_to_physical_nodes, DeduplicateExec},
util::arrow_sort_key_exprs,
CHUNK_ORDER_COLUMN_NAME,
};
/// Determine sort key order of [`DeduplicateExec`].
@ -80,6 +81,7 @@ impl PhysicalOptimizerRule for DedupSortOrder {
let mut quorum_sort_key_builder = SortKeyBuilder::default();
let mut todo_pk_columns = dedup_exec.sort_columns();
todo_pk_columns.remove(CHUNK_ORDER_COLUMN_NAME);
while !todo_pk_columns.is_empty() {
let candidate_counts = todo_pk_columns.iter().copied().map(|col| {
let count = chunk_sort_keys
@ -133,7 +135,11 @@ impl PhysicalOptimizerRule for DedupSortOrder {
);
let sort_exprs = arrow_sort_key_exprs(&quorum_sort_key, &schema);
return Ok(Some(Arc::new(DeduplicateExec::new(child, sort_exprs))));
return Ok(Some(Arc::new(DeduplicateExec::new(
child,
sort_exprs,
dedup_exec.use_chunk_order_col(),
))));
}
Ok(None)
@ -165,7 +171,7 @@ mod tests {
use crate::{
physical_optimizer::{
dedup::test_util::{chunk, dedup_plan},
dedup::test_util::{chunk, dedup_plan, dedup_plan_with_chunk_order_col},
test_util::OptimizationTest,
},
test::TestChunk,
@ -246,6 +252,35 @@ mod tests {
);
}
#[test]
fn test_single_chunk_with_chunk_order_col() {
let chunk = chunk(1)
.with_dummy_parquet_file()
.with_sort_key(SortKey::from_columns([
Arc::from("tag2"),
Arc::from("tag1"),
Arc::from(TIME_COLUMN_NAME),
]));
let schema = chunk.schema().clone();
let plan = dedup_plan_with_chunk_order_col(schema, vec![chunk]);
let opt = DedupSortOrder::default();
insta::assert_yaml_snapshot!(
OptimizationTest::new(plan, opt),
@r###"
---
input:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " ParquetExec: limit=None, partitions={1 group: [[1.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
output:
Ok:
- " DeduplicateExec: [tag2@2 ASC,tag1@1 ASC,time@3 ASC]"
- " UnionExec"
- " ParquetExec: limit=None, partitions={1 group: [[1.parquet]]}, output_ordering=[tag2@2 ASC, tag1@1 ASC, time@3 ASC, __chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
"###
);
}
#[test]
fn test_unusual_time_order() {
let chunk = chunk(1)

View File

@ -89,6 +89,7 @@ impl PhysicalOptimizerRule for PartitionSplit {
config.execution.target_partitions,
),
dedup_exec.sort_keys().to_vec(),
dedup_exec.use_chunk_order_col(),
)) as _
})
.collect(),

View File

@ -1,10 +1,12 @@
use std::sync::Arc;
use arrow::datatypes::Schema as ArrowSchema;
use datafusion::physical_plan::ExecutionPlan;
use predicate::Predicate;
use schema::Schema;
use crate::{
chunk_order_field,
provider::{chunks_to_physical_nodes, DeduplicateExec},
test::TestChunk,
util::arrow_sort_key_exprs,
@ -12,15 +14,43 @@ use crate::{
};
pub fn dedup_plan(schema: Schema, chunks: Vec<TestChunk>) -> Arc<dyn ExecutionPlan> {
dedup_plan_impl(schema, chunks, false)
}
pub fn dedup_plan_with_chunk_order_col(
schema: Schema,
chunks: Vec<TestChunk>,
) -> Arc<dyn ExecutionPlan> {
dedup_plan_impl(schema, chunks, true)
}
fn dedup_plan_impl(
schema: Schema,
chunks: Vec<TestChunk>,
use_chunk_order_col: bool,
) -> Arc<dyn ExecutionPlan> {
let chunks = chunks
.into_iter()
.map(|c| Arc::new(c) as _)
.collect::<Vec<Arc<dyn QueryChunk>>>();
let plan = chunks_to_physical_nodes(&schema.as_arrow(), None, chunks, Predicate::new(), 2);
let arrow_schema = if use_chunk_order_col {
Arc::new(ArrowSchema::new(
schema
.as_arrow()
.fields
.iter()
.cloned()
.chain(std::iter::once(chunk_order_field()))
.collect(),
))
} else {
schema.as_arrow()
};
let plan = chunks_to_physical_nodes(&arrow_schema, None, chunks, Predicate::new(), 2);
let sort_key = schema::sort::SortKey::from_columns(schema.primary_key());
let sort_exprs = arrow_sort_key_exprs(&sort_key, &schema.as_arrow());
Arc::new(DeduplicateExec::new(plan, sort_exprs))
let sort_exprs = arrow_sort_key_exprs(&sort_key, &plan.schema());
Arc::new(DeduplicateExec::new(plan, sort_exprs, use_chunk_order_col))
}
pub fn chunk(id: u128) -> TestChunk {

View File

@ -76,6 +76,7 @@ impl PhysicalOptimizerRule for TimeSplit {
config.execution.target_partitions,
),
dedup_exec.sort_keys().to_vec(),
dedup_exec.use_chunk_order_col(),
)) as _
})
.collect(),

View File

@ -104,6 +104,7 @@ impl PhysicalOptimizerRule for PredicatePushdown {
grandchild,
)?),
child_dedup.sort_keys().to_vec(),
child_dedup.use_chunk_order_col(),
));
if !no_pushdown.is_empty() {
new_node = Arc::new(FilterExec::try_new(
@ -354,6 +355,7 @@ mod tests {
Arc::new(DeduplicateExec::new(
Arc::new(EmptyExec::new(true, Arc::clone(&schema))),
sort_expr(&schema),
false,
)),
)
.unwrap(),
@ -385,6 +387,7 @@ mod tests {
Arc::new(DeduplicateExec::new(
Arc::new(EmptyExec::new(true, Arc::clone(&schema))),
sort_expr(&schema),
false,
)),
)
.unwrap(),
@ -423,6 +426,7 @@ mod tests {
Arc::new(DeduplicateExec::new(
Arc::new(EmptyExec::new(true, Arc::clone(&schema))),
sort_expr(&schema),
false,
)),
)
.unwrap(),

View File

@ -208,7 +208,11 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
child_dedup.sort_keys(),
&plan.schema(),
)?;
Ok(Arc::new(DeduplicateExec::new(plan, sort_keys)))
Ok(Arc::new(DeduplicateExec::new(
plan,
sort_keys,
child_dedup.use_chunk_order_col(),
)))
},
)?;
@ -981,6 +985,7 @@ mod tests {
..Default::default()
},
}],
false,
)),
)
.unwrap(),
@ -1036,6 +1041,7 @@ mod tests {
},
},
],
false,
)),
)
.unwrap(),
@ -1135,6 +1141,7 @@ mod tests {
options: Default::default(),
},
],
false,
));
let plan =
Arc::new(FilterExec::try_new(expr_string_cmp("tag2", &plan.schema()), plan).unwrap());

View File

@ -941,7 +941,7 @@ impl Deduplicater {
sort_exprs: Vec<PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
Arc::new(DeduplicateExec::new(input, sort_exprs))
Arc::new(DeduplicateExec::new(input, sort_exprs, false))
}
/// Creates a plan that produces output_schema given a plan that

View File

@ -7,13 +7,15 @@ use std::{collections::HashSet, fmt, sync::Arc};
use arrow::{error::ArrowError, record_batch::RecordBatch};
use datafusion_util::{watch::WatchedTask, AdapterStream};
use crate::CHUNK_ORDER_COLUMN_NAME;
use self::algo::get_col_name;
pub use self::algo::RecordBatchDeduplicator;
use datafusion::{
error::{DataFusionError, Result},
execution::context::TaskContext,
physical_plan::{
expressions::PhysicalSortExpr,
expressions::{Column, PhysicalSortExpr},
metrics::{
self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, RecordOutput,
},
@ -108,15 +110,33 @@ use tokio::sync::mpsc;
pub struct DeduplicateExec {
input: Arc<dyn ExecutionPlan>,
sort_keys: Vec<PhysicalSortExpr>,
input_order: Vec<PhysicalSortExpr>,
use_chunk_order_col: bool,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}
impl DeduplicateExec {
pub fn new(input: Arc<dyn ExecutionPlan>, sort_keys: Vec<PhysicalSortExpr>) -> Self {
pub fn new(
input: Arc<dyn ExecutionPlan>,
sort_keys: Vec<PhysicalSortExpr>,
use_chunk_order_col: bool,
) -> Self {
let mut input_order = sort_keys.clone();
if use_chunk_order_col {
input_order.push(PhysicalSortExpr {
expr: Arc::new(
Column::new_with_schema(CHUNK_ORDER_COLUMN_NAME, &input.schema())
.expect("input has chunk order col"),
),
options: Default::default(),
})
}
Self {
input,
sort_keys,
input_order,
use_chunk_order_col,
metrics: ExecutionPlanMetricsSet::new(),
}
}
@ -125,12 +145,17 @@ impl DeduplicateExec {
&self.sort_keys
}
/// Combination of all columns within the sort key and potentially the chunk order column.
pub fn sort_columns(&self) -> HashSet<&str> {
self.sort_keys
self.input_order
.iter()
.map(|sk| get_col_name(sk.expr.as_ref()))
.collect()
}
pub fn use_chunk_order_col(&self) -> bool {
self.use_chunk_order_col
}
}
#[derive(Debug)]
@ -166,8 +191,7 @@ impl ExecutionPlan for DeduplicateExec {
}
fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
// requires the input to be sorted on the primary key
vec![self.output_ordering()]
vec![Some(&self.input_order)]
}
fn maintains_input_order(&self) -> Vec<bool> {
@ -188,7 +212,11 @@ impl ExecutionPlan for DeduplicateExec {
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
assert_eq!(children.len(), 1);
let input = Arc::clone(&children[0]);
Ok(Arc::new(Self::new(input, self.sort_keys.clone())))
Ok(Arc::new(Self::new(
input,
self.sort_keys.clone(),
self.use_chunk_order_col,
)))
}
fn execute(
@ -987,7 +1015,7 @@ mod test {
},
}];
let exec: Arc<dyn ExecutionPlan> = Arc::new(DeduplicateExec::new(input, sort_keys));
let exec: Arc<dyn ExecutionPlan> = Arc::new(DeduplicateExec::new(input, sort_keys, false));
test_collect(exec).await;
}
@ -1109,7 +1137,7 @@ mod test {
let input = Arc::new(MemoryExec::try_new(&[input], schema, projection).unwrap());
// Create and run the deduplicator
let exec = Arc::new(DeduplicateExec::new(input, sort_keys));
let exec = Arc::new(DeduplicateExec::new(input, sort_keys, false));
let output = test_collect(Arc::clone(&exec) as Arc<dyn ExecutionPlan>).await;
TestResults { output, exec }

View File

@ -2,18 +2,20 @@
use crate::{
provider::record_batch_exec::RecordBatchesExec, util::arrow_sort_key_exprs, QueryChunk,
QueryChunkData,
QueryChunkData, CHUNK_ORDER_COLUMN_NAME,
};
use arrow::datatypes::SchemaRef;
use arrow::datatypes::{DataType, Schema as ArrowSchema, SchemaRef};
use datafusion::{
datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl},
physical_expr::execution_props::ExecutionProps,
physical_expr::{execution_props::ExecutionProps, PhysicalSortExpr},
physical_plan::{
empty::EmptyExec,
expressions::Column,
file_format::{FileScanConfig, ParquetExec},
union::UnionExec,
ExecutionPlan, Statistics,
ColumnStatistics, ExecutionPlan, Statistics,
},
scalar::ScalarValue,
};
use datafusion_util::create_physical_expr_from_schema;
use object_store::ObjectMeta;
@ -187,25 +189,49 @@ pub fn chunks_to_physical_nodes(
}
let mut parquet_chunks: Vec<_> = parquet_chunks.into_iter().collect();
parquet_chunks.sort_by_key(|(url_str, _)| url_str.clone());
let has_chunk_order_col = schema.field_with_name(CHUNK_ORDER_COLUMN_NAME).is_ok();
for (_url_str, chunk_list) in parquet_chunks {
let ParquetChunkList {
object_store_url,
chunks,
mut chunks,
sort_key,
} = chunk_list;
// ensure that chunks are actually ordered by chunk order
chunks.sort_by_key(|(_meta, c)| c.order());
let num_rows = chunks
.iter()
.map(|(_meta, c)| c.summary().total_count() as usize)
.sum::<usize>();
let chunk_order_min = chunks
.iter()
.map(|(_meta, c)| c.order().get())
.min()
.expect("at least one chunk");
let chunk_order_max = chunks
.iter()
.map(|(_meta, c)| c.order().get())
.max()
.expect("at least one chunk");
let file_groups = distribute(
chunks
.into_iter()
.map(|(object_meta, chunk)| PartitionedFile {
chunks.into_iter().map(|(object_meta, chunk)| {
let partition_values = if has_chunk_order_col {
vec![ScalarValue::from(chunk.order().get())]
} else {
vec![]
};
PartitionedFile {
object_meta,
partition_values: vec![],
partition_values,
range: None,
extensions: Some(Arc::new(PartitionedFileExt {
chunk,
output_sort_key_memo: output_sort_key.cloned(),
})),
}),
}
}),
target_partitions,
);
@ -224,14 +250,76 @@ pub fn chunks_to_physical_nodes(
}
});
let (table_partition_cols, file_schema, output_ordering) = if has_chunk_order_col {
let table_partition_cols = vec![(CHUNK_ORDER_COLUMN_NAME.to_owned(), DataType::Int64)];
let file_schema = Arc::new(ArrowSchema::new(
schema
.fields
.iter()
.filter(|f| f.name() != CHUNK_ORDER_COLUMN_NAME)
.cloned()
.collect(),
));
let output_ordering = Some(
output_ordering
.unwrap_or_default()
.into_iter()
.chain(std::iter::once(PhysicalSortExpr {
expr: Arc::new(
Column::new_with_schema(CHUNK_ORDER_COLUMN_NAME, schema)
.expect("just added col"),
),
options: Default::default(),
}))
.collect::<Vec<_>>(),
);
(table_partition_cols, file_schema, output_ordering)
} else {
(vec![], Arc::clone(schema), output_ordering)
};
let statistics = Statistics {
num_rows: Some(num_rows),
total_byte_size: None,
column_statistics: Some(
schema
.fields
.iter()
.map(|f| {
let null_count = if f.is_nullable() { None } else { Some(0) };
let (min_value, max_value) = if f.name() == CHUNK_ORDER_COLUMN_NAME {
(
Some(ScalarValue::from(chunk_order_min)),
Some(ScalarValue::from(chunk_order_max)),
)
} else {
(None, None)
};
ColumnStatistics {
null_count,
min_value,
max_value,
distinct_count: None,
}
})
.collect(),
),
// this does NOT account for predicate pushdown
// Also see https://github.com/apache/arrow-datafusion/issues/5614
is_exact: false,
};
let base_config = FileScanConfig {
object_store_url,
file_schema: Arc::clone(schema),
file_schema,
file_groups,
statistics: Statistics::default(),
statistics,
projection: None,
limit: None,
table_partition_cols: vec![],
table_partition_cols,
output_ordering,
infinite_source: false,
};
@ -271,6 +359,7 @@ mod tests {
use schema::{sort::SortKeyBuilder, SchemaBuilder, TIME_COLUMN_NAME};
use crate::{
chunk_order_field,
test::{format_execution_plan, TestChunk},
QueryChunkMeta,
};
@ -482,4 +571,38 @@ mod tests {
"###
);
}
#[test]
fn test_chunks_to_physical_nodes_mixed_with_chunk_order() {
let chunk1 = TestChunk::new("table")
.with_tag_column("tag")
.with_dummy_parquet_file();
let chunk2 = TestChunk::new("table").with_tag_column("tag");
let schema = Arc::new(ArrowSchema::new(
chunk1
.schema()
.as_arrow()
.fields
.iter()
.cloned()
.chain(std::iter::once(chunk_order_field()))
.collect(),
));
let plan = chunks_to_physical_nodes(
&schema,
None,
vec![Arc::new(chunk1), Arc::new(chunk2)],
Predicate::new(),
2,
);
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
---
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: limit=None, partitions={1 group: [[0.parquet]]}, output_ordering=[__chunk_order@1 ASC], projection=[tag, __chunk_order]"
"###
);
}
}

View File

@ -1,23 +1,30 @@
//! Implementation of a DataFusion PhysicalPlan node across partition chunks
use crate::QueryChunk;
use crate::{QueryChunk, CHUNK_ORDER_COLUMN_NAME};
use super::adapter::SchemaAdapterStream;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use data_types::TableSummary;
use data_types::{ColumnSummary, InfluxDbType, TableSummary};
use datafusion::{
error::DataFusionError,
execution::context::TaskContext,
physical_plan::{
expressions::PhysicalSortExpr,
expressions::{Column, PhysicalSortExpr},
memory::MemoryStream,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
},
scalar::ScalarValue,
};
use observability_deps::tracing::trace;
use schema::sort::SortKey;
use std::{collections::HashSet, fmt, sync::Arc};
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
fmt,
num::NonZeroU64,
sync::Arc,
};
/// Implements the DataFusion physical plan interface for [`RecordBatch`]es with automatic projection and NULL-column creation.
#[derive(Debug)]
@ -41,6 +48,9 @@ pub(crate) struct RecordBatchesExec {
///
/// [`chunks_to_physical_nodes`]: super::physical::chunks_to_physical_nodes
output_sort_key_memo: Option<SortKey>,
/// Output ordering.
output_ordering: Option<Vec<PhysicalSortExpr>>,
}
impl RecordBatchesExec {
@ -49,7 +59,7 @@ impl RecordBatchesExec {
schema: SchemaRef,
output_sort_key_memo: Option<SortKey>,
) -> Self {
let mut combined_summary_option: Option<TableSummary> = None;
let has_chunk_order_col = schema.field_with_name(CHUNK_ORDER_COLUMN_NAME).is_ok();
let chunks: Vec<_> = chunks
.into_iter()
@ -58,29 +68,81 @@ impl RecordBatchesExec {
.data()
.into_record_batches()
.expect("chunk must have record batches");
let summary = chunk.summary();
match combined_summary_option.as_mut() {
None => {
combined_summary_option = Some(summary.as_ref().clone());
}
Some(combined_summary) => {
combined_summary.update_from(&summary);
}
}
(chunk, batches)
})
.collect();
let statistics = combined_summary_option
let statistics = chunks
.iter()
.fold(
None,
|mut combined_summary: Option<TableSummary>, (chunk, _batches)| {
let summary = chunk.summary();
let summary = if has_chunk_order_col {
// add chunk order column
let order = chunk.order().get();
let summary = TableSummary {
columns: summary
.columns
.iter()
.cloned()
.chain(std::iter::once(ColumnSummary {
name: CHUNK_ORDER_COLUMN_NAME.to_owned(),
influxdb_type: InfluxDbType::Field,
stats: data_types::Statistics::I64(data_types::StatValues {
min: Some(order),
max: Some(order),
total_count: summary.total_count(),
null_count: Some(0),
distinct_count: Some(NonZeroU64::new(1).unwrap()),
}),
}))
.collect(),
};
Cow::Owned(summary)
} else {
Cow::Borrowed(summary.as_ref())
};
match combined_summary.as_mut() {
None => {
combined_summary = Some(summary.into_owned());
}
Some(combined_summary) => {
combined_summary.update_from(&summary);
}
}
combined_summary
},
)
.map(|combined_summary| crate::statistics::df_from_iox(&schema, &combined_summary))
.unwrap_or_default();
let output_ordering = if has_chunk_order_col {
Some(vec![
// every chunk gets its own partition, so we can claim that the output is ordered
PhysicalSortExpr {
expr: Arc::new(
Column::new_with_schema(CHUNK_ORDER_COLUMN_NAME, &schema)
.expect("just checked presence of chunk order col"),
),
options: Default::default(),
},
])
} else {
None
};
Self {
chunks,
schema,
statistics,
output_sort_key_memo,
output_ordering,
metrics: ExecutionPlanMetricsSet::new(),
}
}
@ -115,8 +177,7 @@ impl ExecutionPlan for RecordBatchesExec {
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
// TODO ??
None
self.output_ordering.as_deref()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@ -175,8 +236,12 @@ impl ExecutionPlan for RecordBatchesExec {
incomplete_output_schema,
projection,
)?);
let virtual_columns = HashMap::from([(
CHUNK_ORDER_COLUMN_NAME,
ScalarValue::from(chunk.order().get()),
)]);
let adapter = Box::pin(
SchemaAdapterStream::try_new(stream, schema, &Default::default(), baseline_metrics)
SchemaAdapterStream::try_new(stream, schema, &virtual_columns, baseline_metrics)
.map_err(|e| DataFusionError::Internal(e.to_string()))?,
);