chore: Update datafusion (#7100)

* chore: Update datafusion

* chore: iox_query to compile for API changes + update tests

* chore: Run cargo hakari tasks

---------

Co-authored-by: CircleCI[bot] <circleci@influxdata.com>
pull/24376/head
Andrew Lamb 2023-03-06 18:59:24 +01:00 committed by GitHub
parent 87ae7e72cd
commit ed0704ac8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 138 additions and 72 deletions

16
Cargo.lock generated
View File

@ -1417,7 +1417,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "19.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12#ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6#03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6"
dependencies = [
"ahash 0.8.3",
"arrow",
@ -1464,7 +1464,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "19.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12#ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6#03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6"
dependencies = [
"arrow",
"chrono",
@ -1477,7 +1477,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "19.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12#ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6#03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6"
dependencies = [
"ahash 0.8.3",
"arrow",
@ -1489,7 +1489,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "19.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12#ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6#03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6"
dependencies = [
"arrow",
"async-trait",
@ -1506,7 +1506,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "19.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12#ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6#03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6"
dependencies = [
"ahash 0.8.3",
"arrow",
@ -1536,7 +1536,7 @@ dependencies = [
[[package]]
name = "datafusion-proto"
version = "19.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12#ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6#03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6"
dependencies = [
"arrow",
"chrono",
@ -1553,7 +1553,7 @@ dependencies = [
[[package]]
name = "datafusion-row"
version = "19.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12#ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6#03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6"
dependencies = [
"arrow",
"datafusion-common",
@ -1564,7 +1564,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "19.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12#ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6#03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6"
dependencies = [
"arrow-schema",
"datafusion-common",

View File

@ -117,8 +117,8 @@ license = "MIT OR Apache-2.0"
[workspace.dependencies]
arrow = { version = "34.0.0" }
arrow-flight = { version = "34.0.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6" }
hashbrown = { version = "0.13.2" }
parquet = { version = "34.0.0" }

View File

@ -20,12 +20,14 @@ use std::task::{Context, Poll};
use datafusion::arrow::array::BooleanArray;
use datafusion::arrow::compute::filter_record_batch;
use datafusion::arrow::datatypes::DataType;
use datafusion::common::DataFusionError;
use datafusion::common::{DataFusionError, ToDFSchema};
use datafusion::datasource::MemTable;
use datafusion::execution::context::TaskContext;
use datafusion::execution::memory_pool::UnboundedMemoryPool;
use datafusion::logical_expr::expr::Sort;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_expr::execution_props::ExecutionProps;
use datafusion::physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::common::SizedRecordBatchStream;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
use datafusion::physical_plan::{collect, EmptyRecordBatchStream, ExecutionPlan};
@ -367,6 +369,26 @@ pub fn nullable_schema(schema: SchemaRef) -> SchemaRef {
}
}
/// Returns a [`PhysicalExpr`] from the logical [`Expr`] and Arrow [`SchemaRef`]
pub fn create_physical_expr_from_schema(
props: &ExecutionProps,
expr: &Expr,
schema: &SchemaRef,
) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> {
let df_schema = Arc::clone(schema).to_dfschema_ref()?;
create_physical_expr(expr, df_schema.as_ref(), schema.as_ref(), props)
}
/// Returns a [`PruningPredicate`] from the logical [`Expr`] and Arrow [`SchemaRef`]
pub fn create_pruning_predicate(
props: &ExecutionProps,
expr: &Expr,
schema: &SchemaRef,
) -> Result<PruningPredicate, DataFusionError> {
let expr = create_physical_expr_from_schema(props, expr, schema)?;
PruningPredicate::try_new(expr, Arc::clone(schema))
}
#[cfg(test)]
mod tests {
use datafusion::arrow::datatypes::{DataType, Field};

View File

@ -39,8 +39,8 @@
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag@2 = A, pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=tag@2 = A, pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | |
----------
-- SQL: SELECT * FROM "table" WHERE foo=1 AND bar=2;
@ -88,8 +88,8 @@
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time = TimestampNanosecond(0, None), pruning_predicate=time_min@0 <= 0 AND 0 <= time_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=time = TimestampNanosecond(0, None), pruning_predicate=time_min@0 <= 0 AND 0 <= time_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time@3 = 0, pruning_predicate=time_min@0 <= 0 AND 0 <= time_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=time@3 = 0, pruning_predicate=time_min@0 <= 0 AND 0 <= time_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | |
----------
-- SQL: SELECT * FROM "table" WHERE tag='A' AND foo=1 AND time=to_timestamp('1970-01-01T00:00:00.000000000+00:00');
@ -110,7 +110,7 @@
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")) AND time = TimestampNanosecond(0, None), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")) AND time = TimestampNanosecond(0, None), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag@2 = A AND time@3 = 0, pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=tag@2 = A AND time@3 = 0, pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | |
----------

View File

@ -40,7 +40,7 @@
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag@2 = A, pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | SortExec: expr=[tag@2 ASC,time@3 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 |
| | |
@ -91,7 +91,7 @@
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time = TimestampNanosecond(0, None), pruning_predicate=time_min@0 <= 0 AND 0 <= time_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time@3 = 0, pruning_predicate=time_min@0 <= 0 AND 0 <= time_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | SortExec: expr=[tag@2 ASC,time@3 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 |
| | |
@ -114,7 +114,7 @@
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")) AND time = TimestampNanosecond(0, None), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag@2 = A AND time@3 = 0, pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | SortExec: expr=[tag@2 ASC,time@3 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 |
| | |

View File

@ -111,8 +111,8 @@
| | DeduplicateExec: [state@4 ASC,city@1 ASC,time@5 ASC], metrics=[elapsed_compute=1.234ms, mem_used=0, num_dupes=2, output_rows=5, spill_count=0, spilled_bytes=0] |
| | SortPreservingMergeExec: [state@4 ASC,city@1 ASC,time@5 ASC], metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=7, spill_count=0, spilled_bytes=0] |
| | UnionExec, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=7, spill_count=0, spilled_bytes=0] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=state = Dictionary(Int32, Utf8("MA")), pruning_predicate=state_min@0 <= MA AND MA <= state_max@1, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time], metrics=[bytes_scanned=474, elapsed_compute=1.234ms, mem_used=0, num_predicate_creation_errors=0, output_rows=4, page_index_eval_time=1.234ms, page_index_rows_filtered=0, predicate_evaluation_errors=0, pushdown_eval_time=1.234ms, pushdown_rows_filtered=0, row_groups_pruned=0, spill_count=0, spilled_bytes=0, time_elapsed_opening=1.234ms, time_elapsed_processing=1.234ms, time_elapsed_scanning_total=1.234ms, time_elapsed_scanning_until_data=1.234ms] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=state = Dictionary(Int32, Utf8("MA")), pruning_predicate=state_min@0 <= MA AND MA <= state_max@1, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time], metrics=[bytes_scanned=632, elapsed_compute=1.234ms, mem_used=0, num_predicate_creation_errors=0, output_rows=3, page_index_eval_time=1.234ms, page_index_rows_filtered=0, predicate_evaluation_errors=0, pushdown_eval_time=1.234ms, pushdown_rows_filtered=3, row_groups_pruned=0, spill_count=0, spilled_bytes=0, time_elapsed_opening=1.234ms, time_elapsed_processing=1.234ms, time_elapsed_scanning_total=1.234ms, time_elapsed_scanning_until_data=1.234ms] |
| | ParquetExec: limit=None, partitions={2 groups: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet], [1/1/1/1/00000000-0000-0000-0000-000000000003.parquet]]}, predicate=state = Dictionary(Int32, Utf8("MA")), pruning_predicate=state_min@0 <= MA AND MA <= state_max@1, projection=[area, city, max_temp, min_temp, state, time], metrics=[bytes_scanned=1219, elapsed_compute=1.234ms, mem_used=0, num_predicate_creation_errors=0, output_rows=5, page_index_eval_time=1.234ms, page_index_rows_filtered=0, predicate_evaluation_errors=0, pushdown_eval_time=1.234ms, pushdown_rows_filtered=5, row_groups_pruned=0, spill_count=0, spilled_bytes=0, time_elapsed_opening=1.234ms, time_elapsed_processing=1.234ms, time_elapsed_scanning_total=1.234ms, time_elapsed_scanning_until_data=1.234ms] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=state@4 = MA, pruning_predicate=state_min@0 <= MA AND MA <= state_max@1, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time], metrics=[bytes_scanned=474, elapsed_compute=1.234ms, mem_used=0, num_predicate_creation_errors=0, output_rows=4, page_index_eval_time=1.234ms, page_index_rows_filtered=0, predicate_evaluation_errors=0, pushdown_eval_time=1.234ms, pushdown_rows_filtered=0, row_groups_pruned=0, spill_count=0, spilled_bytes=0, time_elapsed_opening=1.234ms, time_elapsed_processing=1.234ms, time_elapsed_scanning_total=1.234ms, time_elapsed_scanning_until_data=1.234ms] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=state@4 = MA, pruning_predicate=state_min@0 <= MA AND MA <= state_max@1, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time], metrics=[bytes_scanned=632, elapsed_compute=1.234ms, mem_used=0, num_predicate_creation_errors=0, output_rows=3, page_index_eval_time=1.234ms, page_index_rows_filtered=0, predicate_evaluation_errors=0, pushdown_eval_time=1.234ms, pushdown_rows_filtered=3, row_groups_pruned=0, spill_count=0, spilled_bytes=0, time_elapsed_opening=1.234ms, time_elapsed_processing=1.234ms, time_elapsed_scanning_total=1.234ms, time_elapsed_scanning_until_data=1.234ms] |
| | ParquetExec: limit=None, partitions={2 groups: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet], [1/1/1/1/00000000-0000-0000-0000-000000000003.parquet]]}, predicate=state@4 = MA, pruning_predicate=state_min@0 <= MA AND MA <= state_max@1, projection=[area, city, max_temp, min_temp, state, time], metrics=[bytes_scanned=1219, elapsed_compute=1.234ms, mem_used=0, num_predicate_creation_errors=0, output_rows=5, page_index_eval_time=1.234ms, page_index_rows_filtered=0, predicate_evaluation_errors=0, pushdown_eval_time=1.234ms, pushdown_rows_filtered=5, row_groups_pruned=0, spill_count=0, spilled_bytes=0, time_elapsed_opening=1.234ms, time_elapsed_processing=1.234ms, time_elapsed_scanning_total=1.234ms, time_elapsed_scanning_until_data=1.234ms] |
| | |
----------

View File

@ -20,7 +20,7 @@
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@0 >= 957528000000000000 AND time@0 <= 957531540000000000 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time >= TimestampNanosecond(957528000000000000, None) AND time <= TimestampNanosecond(957531540000000000, None), pruning_predicate=time_max@0 >= 957528000000000000 AND time_min@1 <= 957531540000000000, projection=[time, user] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time@0 >= 957528000000000000 AND time@0 <= 957531540000000000, pruning_predicate=time_max@0 >= 957528000000000000 AND time_min@1 <= 957531540000000000, projection=[time, user] |
| | |
----------
-- SQL: SELECT date_bin_gapfill(interval '10 minute', time, timestamp '1970-01-01T00:00:00Z') as minute, count(cpu.user) from cpu where time between timestamp '2000-05-05T12:00:00Z' and timestamp '2000-05-05T12:59:00Z' group by minute;

View File

@ -42,7 +42,7 @@
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.count > UInt64(200)] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: count@0 > 200 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200), pruning_predicate=count_max@0 > 200, projection=[count, system, time, town] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count@0 > 200, pruning_predicate=count_max@0 > 200, projection=[count, system, time, town] |
| | |
----------
-- SQL: EXPLAIN SELECT * from restaurant where count > 200.0;
@ -54,7 +54,7 @@
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Float64) > Float64(200)] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: CAST(count@0 AS Float64) > 200 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=CAST(count AS Float64) > Float64(200), projection=[count, system, time, town] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=CAST(count@0 AS Float64) > 200, projection=[count, system, time, town] |
| | |
----------
-- SQL: EXPLAIN SELECT * from restaurant where system > 4.0;
@ -66,7 +66,7 @@
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.system > Float64(4)] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: system@1 > 4 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(4), pruning_predicate=system_max@0 > 4, projection=[count, system, time, town] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system@1 > 4, pruning_predicate=system_max@0 > 4, projection=[count, system, time, town] |
| | |
----------
-- SQL: SELECT * from restaurant where count > 200 and town != 'tewsbury';
@ -89,7 +89,7 @@
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.count > UInt64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury"))] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: count@0 > 200 AND town@3 != tewsbury |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200) AND town != Dictionary(Int32, Utf8("tewsbury")), pruning_predicate=count_max@0 > 200 AND (town_min@1 != tewsbury OR tewsbury != town_max@2), projection=[count, system, time, town] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count@0 > 200 AND town@3 != tewsbury, pruning_predicate=count_max@0 > 200 AND (town_min@1 != tewsbury OR tewsbury != town_max@2), projection=[count, system, time, town] |
| | |
----------
-- SQL: SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence');
@ -111,7 +111,7 @@
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.count > UInt64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence"))] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: count@0 > 200 AND town@3 != tewsbury AND (system@1 = 5 OR town@3 = lawrence) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200) AND town != Dictionary(Int32, Utf8("tewsbury")) AND (system = Float64(5) OR town = Dictionary(Int32, Utf8("lawrence"))), pruning_predicate=count_max@0 > 200 AND (town_min@1 != tewsbury OR tewsbury != town_max@2) AND (system_min@3 <= 5 AND 5 <= system_max@4 OR town_min@1 <= lawrence AND lawrence <= town_max@2), projection=[count, system, time, town] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count@0 > 200 AND town@3 != tewsbury AND (system@1 = 5 OR town@3 = lawrence), pruning_predicate=count_max@0 > 200 AND (town_min@1 != tewsbury OR tewsbury != town_max@2) AND (system_min@3 <= 5 AND 5 <= system_max@4 OR town_min@1 <= lawrence AND lawrence <= town_max@2), projection=[count, system, time, town] |
| | |
----------
-- SQL: SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence') and count < 40000;
@ -132,7 +132,7 @@
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.count > UInt64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence")), restaurant.count < UInt64(40000)] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: count@0 > 200 AND town@3 != tewsbury AND (system@1 = 5 OR town@3 = lawrence) AND count@0 < 40000 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200) AND town != Dictionary(Int32, Utf8("tewsbury")) AND (system = Float64(5) OR town = Dictionary(Int32, Utf8("lawrence"))) AND count < UInt64(40000), pruning_predicate=count_max@0 > 200 AND (town_min@1 != tewsbury OR tewsbury != town_max@2) AND (system_min@3 <= 5 AND 5 <= system_max@4 OR town_min@1 <= lawrence AND lawrence <= town_max@2) AND count_min@5 < 40000, projection=[count, system, time, town] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count@0 > 200 AND town@3 != tewsbury AND (system@1 = 5 OR town@3 = lawrence) AND count@0 < 40000, pruning_predicate=count_max@0 > 200 AND (town_min@1 != tewsbury OR tewsbury != town_max@2) AND (system_min@3 <= 5 AND 5 <= system_max@4 OR town_min@1 <= lawrence AND lawrence <= town_max@2) AND count_min@5 < 40000, projection=[count, system, time, town] |
| | |
----------
-- SQL: SELECT * from restaurant where count > 200 and count < 40000;
@ -155,7 +155,7 @@
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.count > UInt64(200), restaurant.count < UInt64(40000)] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: count@0 > 200 AND count@0 < 40000 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200) AND count < UInt64(40000), pruning_predicate=count_max@0 > 200 AND count_min@1 < 40000, projection=[count, system, time, town] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count@0 > 200 AND count@0 < 40000, pruning_predicate=count_max@0 > 200 AND count_min@1 < 40000, projection=[count, system, time, town] |
| | |
----------
-- SQL: SELECT * from restaurant where system > 4.0 and system < 7.0;
@ -179,7 +179,7 @@
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.system > Float64(4), restaurant.system < Float64(7)] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: system@1 > 4 AND system@1 < 7 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(4) AND system < Float64(7), pruning_predicate=system_max@0 > 4 AND system_min@1 < 7, projection=[count, system, time, town] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system@1 > 4 AND system@1 < 7, pruning_predicate=system_max@0 > 4 AND system_min@1 < 7, projection=[count, system, time, town] |
| | |
----------
-- SQL: SELECT * from restaurant where system > 5.0 and system < 7.0;
@ -200,7 +200,7 @@
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.system > Float64(5), restaurant.system < Float64(7)] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: system@1 > 5 AND system@1 < 7 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(5) AND system < Float64(7), pruning_predicate=system_max@0 > 5 AND system_min@1 < 7, projection=[count, system, time, town] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system@1 > 5 AND system@1 < 7, pruning_predicate=system_max@0 > 5 AND system_min@1 < 7, projection=[count, system, time, town] |
| | |
----------
-- SQL: SELECT * from restaurant where system > 5.0 and town != 'tewsbury' and 7.0 > system;
@ -220,7 +220,7 @@
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.system > Float64(5), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), Float64(7) > restaurant.system] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: system@1 > 5 AND town@3 != tewsbury AND 7 > system@1 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(5) AND town != Dictionary(Int32, Utf8("tewsbury")) AND Float64(7) > system, pruning_predicate=system_max@0 > 5 AND (town_min@1 != tewsbury OR tewsbury != town_max@2) AND system_min@3 < 7, projection=[count, system, time, town] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system@1 > 5 AND town@3 != tewsbury AND 7 > system@1, pruning_predicate=system_max@0 > 5 AND (town_min@1 != tewsbury OR tewsbury != town_max@2) AND system_min@3 < 7, projection=[count, system, time, town] |
| | |
----------
-- SQL: SELECT * from restaurant where system > 5.0 and 'tewsbury' != town and system < 7.0 and (count = 632 or town = 'reading');
@ -239,7 +239,7 @@
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.system > Float64(5), Dictionary(Int32, Utf8("tewsbury")) != restaurant.town, restaurant.system < Float64(7), restaurant.count = UInt64(632) OR restaurant.town = Dictionary(Int32, Utf8("reading"))] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: system@1 > 5 AND tewsbury != town@3 AND system@1 < 7 AND (count@0 = 632 OR town@3 = reading) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(5) AND Dictionary(Int32, Utf8("tewsbury")) != town AND system < Float64(7) AND (count = UInt64(632) OR town = Dictionary(Int32, Utf8("reading"))), pruning_predicate=system_max@0 > 5 AND (town_min@1 != tewsbury OR tewsbury != town_max@2) AND system_min@3 < 7 AND (count_min@4 <= 632 AND 632 <= count_max@5 OR town_min@1 <= reading AND reading <= town_max@2), projection=[count, system, time, town] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system@1 > 5 AND tewsbury != town@3 AND system@1 < 7 AND (count@0 = 632 OR town@3 = reading), pruning_predicate=system_max@0 > 5 AND (town_min@1 != tewsbury OR tewsbury != town_max@2) AND system_min@3 < 7 AND (count_min@4 <= 632 AND 632 <= count_max@5 OR town_min@1 <= reading AND reading <= town_max@2), projection=[count, system, time, town] |
| | |
----------
-- SQL: SELECT * from restaurant where 5.0 < system and town != 'tewsbury' and system < 7.0 and (count = 632 or town = 'reading') and time > to_timestamp('1970-01-01T00:00:00.000000130+00:00');
@ -294,6 +294,6 @@
| | FilterExec: (CAST(restaurant.town AS Utf8)restaurant.town@0 LIKE %foo% OR CAST(restaurant.town AS Utf8)restaurant.town@0 LIKE %bar% OR CAST(restaurant.town AS Utf8)restaurant.town@0 LIKE %baz%) AND CAST(restaurant.town AS Utf8)restaurant.town@0 NOT LIKE %one% AND CAST(restaurant.town AS Utf8)restaurant.town@0 NOT LIKE %two% |
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
| | ProjectionExec: expr=[CAST(town@3 AS Utf8) as CAST(restaurant.town AS Utf8)restaurant.town, count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=(CAST(town AS Utf8) AS restaurant.town LIKE Utf8("%foo%") OR CAST(town AS Utf8) AS restaurant.town LIKE Utf8("%bar%") OR CAST(town AS Utf8) AS restaurant.town LIKE Utf8("%baz%")) AND CAST(town AS Utf8) AS restaurant.town NOT LIKE Utf8("%one%") AND CAST(town AS Utf8) AS restaurant.town NOT LIKE Utf8("%two%") AND (CAST(town AS Utf8) LIKE Utf8("%foo%") OR CAST(town AS Utf8) LIKE Utf8("%bar%") OR CAST(town AS Utf8) LIKE Utf8("%baz%")) AND CAST(town AS Utf8) NOT LIKE Utf8("%one%") AND CAST(town AS Utf8) NOT LIKE Utf8("%two%"), projection=[count, system, time, town] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=(CAST(town@3 AS Utf8) LIKE %foo% OR CAST(town@3 AS Utf8) LIKE %bar% OR CAST(town@3 AS Utf8) LIKE %baz%) AND CAST(town@3 AS Utf8) NOT LIKE %one% AND CAST(town@3 AS Utf8) NOT LIKE %two% AND (CAST(town@3 AS Utf8) LIKE %foo% OR CAST(town@3 AS Utf8) LIKE %bar% OR CAST(town@3 AS Utf8) LIKE %baz%) AND CAST(town@3 AS Utf8) NOT LIKE %one% AND CAST(town@3 AS Utf8) NOT LIKE %two%, projection=[count, system, time, town] |
| | |
----------

View File

@ -53,9 +53,9 @@
| | UnionExec |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: <REDACTED>
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=host != Dictionary(Int32, Utf8("b")), pruning_predicate=host_min@0 != b OR b != host_max@1, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=host@0 != b, pruning_predicate=host_min@0 != b OR b != host_max@1, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: <REDACTED>
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=host != Dictionary(Int32, Utf8("b")), pruning_predicate=host_min@0 != b OR b != host_max@1, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=host@0 != b, pruning_predicate=host_min@0 != b OR b != host_max@1, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | |
----------

View File

@ -81,11 +81,11 @@
| | DeduplicateExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | SortPreservingMergeExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time >= TimestampNanosecond(250, None), pruning_predicate=time_max@0 >= 250, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=time >= TimestampNanosecond(250, None), pruning_predicate=time_max@0 >= 250, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time@4 >= 250, pruning_predicate=time_max@0 >= 250, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=time@4 >= 250, pruning_predicate=time_max@0 >= 250, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] |
| | DeduplicateExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | SortExec: expr=[city@0 ASC,state@2 ASC,time@4 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=1 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet]]}, predicate=time >= TimestampNanosecond(250, None), pruning_predicate=time_max@0 >= 250, projection=[city, other_temp, state, temp, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet]]}, predicate=time@4 >= 250, pruning_predicate=time_max@0 >= 250, projection=[city, other_temp, state, temp, time] |
| | |
----------

View File

@ -241,6 +241,9 @@ impl InfluxRpcPlanner {
// Special case predicates that span the entire valid timestamp range
let rpc_predicate = rpc_predicate.clear_timestamp_if_max_range();
let metadata_ctx = ctx.child_ctx("apply_predicate_to_metadata");
let metadata_ctx = &metadata_ctx; // needed to use inside the move closure
let table_predicates = rpc_predicate
.table_predicates(namespace.as_meta())
.context(CreatingPredicatesSnafu)?;
@ -250,6 +253,7 @@ impl InfluxRpcPlanner {
// Identify which chunks can answer from its metadata and then record its table,
// and which chunks needs full plan and group them into their table
let mut chunks_full = vec![];
for chunk in cheap_chunk_first(chunks) {
trace!(chunk_id=%chunk.id(), %table_name, "Considering table");
@ -260,7 +264,7 @@ impl InfluxRpcPlanner {
} else {
// Try and apply the predicate using only metadata
let pred_result = chunk
.apply_predicate_to_metadata(predicate)
.apply_predicate_to_metadata(metadata_ctx, predicate)
.context(CheckingChunkPredicateSnafu {
chunk_id: chunk.id(),
})?;
@ -384,7 +388,7 @@ impl InfluxRpcPlanner {
for chunk in cheap_chunk_first(chunks) {
// Try and apply the predicate using only metadata
let pred_result = chunk.apply_predicate_to_metadata(predicate).context(
let pred_result = chunk.apply_predicate_to_metadata(&ctx, predicate).context(
CheckingChunkPredicateSnafu {
chunk_id: chunk.id(),
},
@ -524,6 +528,9 @@ impl InfluxRpcPlanner {
table_predicates_filtered.push((table_name, predicate));
}
let metadata_ctx = ctx.child_ctx("apply_predicate_to_metadata");
let metadata_ctx = &metadata_ctx; // needed to use inside the move closure
let tables: Vec<_> = table_chunk_stream(
Arc::clone(&namespace),
false,
@ -536,11 +543,11 @@ impl InfluxRpcPlanner {
for chunk in cheap_chunk_first(chunks) {
// Try and apply the predicate using only metadata
let pred_result = chunk.apply_predicate_to_metadata(predicate).context(
CheckingChunkPredicateSnafu {
let pred_result = chunk
.apply_predicate_to_metadata(metadata_ctx, predicate)
.context(CheckingChunkPredicateSnafu {
chunk_id: chunk.id(),
},
)?;
})?;
if matches!(pred_result, PredicateMatch::Zero) {
continue;
@ -1567,9 +1574,12 @@ where
+ Sync,
P: Send,
{
let metadata_ctx = ctx.child_ctx("apply_predicate_to_metadata");
let metadata_ctx = &metadata_ctx; // needed to use inside the move closure
table_chunk_stream(Arc::clone(&namespace), true, table_predicates, &ctx)
.and_then(|(table_name, predicate, chunks)| async move {
let chunks = prune_chunks_metadata(chunks, predicate)?;
let chunks = prune_chunks_metadata(metadata_ctx, chunks, predicate)?;
Ok((table_name, predicate, chunks))
})
// rustc seems to heavily confused about the filter step here, esp. it dislikes `.try_filter` and even
@ -1603,18 +1613,18 @@ where
///
/// TODO: Should this logic live with the rest of the chunk pruning logic?
fn prune_chunks_metadata(
ctx: &IOxSessionContext,
chunks: Vec<Arc<dyn QueryChunk>>,
predicate: &Predicate,
) -> Result<Vec<Arc<dyn QueryChunk>>> {
let mut filtered = Vec::with_capacity(chunks.len());
for chunk in chunks {
// Try and apply the predicate using only metadata
let pred_result =
chunk
.apply_predicate_to_metadata(predicate)
.context(CheckingChunkPredicateSnafu {
chunk_id: chunk.id(),
})?;
let pred_result = chunk.apply_predicate_to_metadata(ctx, predicate).context(
CheckingChunkPredicateSnafu {
chunk_id: chunk.id(),
},
)?;
trace!(?pred_result, chunk_id=?chunk.id(), "applied predicate to metadata");

View File

@ -232,9 +232,15 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static {
/// predicates.
fn apply_predicate_to_metadata(
&self,
ctx: &IOxSessionContext,
predicate: &Predicate,
) -> Result<PredicateMatch, DataFusionError> {
Ok(predicate.apply_to_table_summary(&self.summary(), self.schema().as_arrow()))
let state = ctx.inner().state();
Ok(predicate.apply_to_table_summary(
state.execution_props(),
&self.summary(),
self.schema().as_arrow(),
))
}
/// Returns a set of Strings with column names from the specified

View File

@ -6,6 +6,7 @@ use crate::{
};
use datafusion::{
datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl},
physical_expr::execution_props::ExecutionProps,
physical_plan::{
empty::EmptyExec,
file_format::{FileScanConfig, ParquetExec},
@ -13,7 +14,9 @@ use datafusion::{
ExecutionPlan, Statistics,
},
};
use datafusion_util::create_physical_expr_from_schema;
use object_store::ObjectMeta;
use observability_deps::tracing::warn;
use predicate::Predicate;
use schema::{sort::SortKey, Schema};
use std::{
@ -190,6 +193,18 @@ pub fn chunks_to_physical_nodes(
let output_ordering =
sort_key.map(|sort_key| arrow_sort_key_exprs(&sort_key, &file_schema));
let props = ExecutionProps::new();
let filter_expr = predicate.filter_expr()
.and_then(|filter_expr| {
match create_physical_expr_from_schema(&props, &filter_expr, &file_schema) {
Ok(f) => Some(f),
Err(e) => {
warn!(%e, ?filter_expr, "Error creating physical filter expression, can not push down");
None
}
}
});
let base_config = FileScanConfig {
object_store_url,
file_schema,
@ -202,7 +217,8 @@ pub fn chunks_to_physical_nodes(
infinite_source: false,
};
let meta_size_hint = None;
let parquet_exec = ParquetExec::new(base_config, predicate.filter_expr(), meta_size_hint);
let parquet_exec = ParquetExec::new(base_config, filter_expr, meta_size_hint);
output_nodes.push(Arc::new(parquet_exec));
}

View File

@ -9,9 +9,10 @@ use arrow::{
};
use data_types::{StatValues, Statistics, TableSummary};
use datafusion::{
physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
physical_expr::execution_props::ExecutionProps, physical_optimizer::pruning::PruningStatistics,
prelude::Column,
};
use datafusion_util::create_pruning_predicate;
use observability_deps::tracing::{debug, trace, warn};
use predicate::Predicate;
use query_functions::group_by::Aggregate;
@ -98,8 +99,10 @@ pub fn prune_summaries(
};
trace!(%filter_expr, "Filter_expr of pruning chunks");
// no information about the queries here
let props = ExecutionProps::new();
let pruning_predicate =
match PruningPredicate::try_new(filter_expr.clone(), table_schema.as_arrow()) {
match create_pruning_predicate(&props, &filter_expr, &table_schema.as_arrow()) {
Ok(p) => p,
Err(e) => {
warn!(%e, ?filter_expr, "Can not create pruning predicate");
@ -123,7 +126,7 @@ pub fn prune_summaries(
}
/// Wraps a collection of [`QueryChunk`] and implements the [`PruningStatistics`]
/// interface required by [`PruningPredicate`]
/// interface required for pruning
struct ChunkPruningStatistics<'a> {
table_schema: &'a Schema,
summaries: &'a [Arc<TableSummary>],

View File

@ -116,7 +116,7 @@ impl QueryNamespace for TestDatabase {
table_name: &str,
predicate: &Predicate,
_projection: Option<&Vec<usize>>,
_ctx: IOxSessionContext,
ctx: IOxSessionContext,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
// save last predicate
*self.chunks_predicate.lock() = predicate.clone();
@ -130,7 +130,11 @@ impl QueryNamespace for TestDatabase {
// only keep chunks if their statistics overlap
.filter(|c| {
!matches!(
predicate.apply_to_table_summary(&c.table_summary, c.schema.as_arrow()),
predicate.apply_to_table_summary(
ctx.inner().state().execution_props(),
&c.table_summary,
c.schema.as_arrow()
),
PredicateMatch::Zero
)
})
@ -1172,6 +1176,7 @@ impl QueryChunk for TestChunk {
fn apply_predicate_to_metadata(
&self,
_ctx: &IOxSessionContext,
predicate: &Predicate,
) -> Result<PredicateMatch, DataFusionError> {
self.check_error()?;

View File

@ -30,10 +30,11 @@ use datafusion::{
BinaryExpr,
},
optimizer::utils::split_conjunction,
physical_optimizer::pruning::{PruningPredicate, PruningStatistics},
physical_expr::execution_props::ExecutionProps,
physical_optimizer::pruning::PruningStatistics,
prelude::{col, lit_timestamp_nano, Expr},
};
use datafusion_util::{make_range_expr, nullable_schema, AsExpr};
use datafusion_util::{create_pruning_predicate, make_range_expr, nullable_schema, AsExpr};
use observability_deps::tracing::debug;
use rpc_predicate::VALUE_COLUMN_NAME;
use schema::TIME_COLUMN_NAME;
@ -250,6 +251,7 @@ impl Predicate {
/// look at actual data.
pub fn apply_to_table_summary(
&self,
props: &ExecutionProps,
table_summary: &TableSummary,
schema: SchemaRef,
) -> PredicateMatch {
@ -263,7 +265,7 @@ impl Predicate {
let schema = nullable_schema(schema);
if let Some(expr) = self.filter_expr() {
match PruningPredicate::try_new(expr.clone(), Arc::clone(&schema)) {
match create_pruning_predicate(props, &expr, &schema) {
Ok(pp) => {
match pp.prune(&summary) {
Ok(matched) => {
@ -776,6 +778,7 @@ mod tests {
#[test]
fn test_apply_to_table_summary() {
maybe_start_logging();
let props = ExecutionProps::new();
let p = Predicate::new()
.with_range(100, 200)
@ -806,7 +809,7 @@ mod tests {
}],
};
assert_eq!(
p.apply_to_table_summary(&summary, schema.as_arrow()),
p.apply_to_table_summary(&props, &summary, schema.as_arrow()),
PredicateMatch::Zero,
);
@ -824,7 +827,7 @@ mod tests {
}],
};
assert_eq!(
p.apply_to_table_summary(&summary, schema.as_arrow()),
p.apply_to_table_summary(&props, &summary, schema.as_arrow()),
PredicateMatch::Unknown,
);
@ -842,7 +845,7 @@ mod tests {
}],
};
assert_eq!(
p.apply_to_table_summary(&summary, schema.as_arrow()),
p.apply_to_table_summary(&props, &summary, schema.as_arrow()),
PredicateMatch::Zero,
);
@ -860,7 +863,7 @@ mod tests {
}],
};
assert_eq!(
p.apply_to_table_summary(&summary, schema.as_arrow()),
p.apply_to_table_summary(&props, &summary, schema.as_arrow()),
PredicateMatch::Zero,
);
@ -878,7 +881,7 @@ mod tests {
}],
};
assert_eq!(
p.apply_to_table_summary(&summary, schema.as_arrow()),
p.apply_to_table_summary(&props, &summary, schema.as_arrow()),
PredicateMatch::Unknown,
)
}
@ -887,6 +890,7 @@ mod tests {
#[test]
fn test_apply_to_table_summary_partially_unsupported() {
maybe_start_logging();
let props = ExecutionProps::new();
let p = Predicate::new()
.with_range(100, 200)
@ -926,7 +930,7 @@ mod tests {
],
};
assert_eq!(
p.apply_to_table_summary(&summary, schema.as_arrow()),
p.apply_to_table_summary(&props, &summary, schema.as_arrow()),
PredicateMatch::Zero,
);
}

View File

@ -29,7 +29,7 @@ bytes = { version = "1" }
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "serde"] }
crossbeam-utils = { version = "0.8" }
crypto-common = { version = "0.1", default-features = false, features = ["std"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "03fbf9fecad00f8d6eb3e72e72ba16252b28b1d6" }
digest = { version = "0.10", features = ["mac", "std"] }
either = { version = "1" }
fixedbitset = { version = "0.4" }