diff --git a/iox_query/src/provider.rs b/iox_query/src/provider.rs index 863dff1550..e1f76107ae 100644 --- a/iox_query/src/provider.rs +++ b/iox_query/src/provider.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use data_types::DeletePredicate; use hashbrown::HashMap; -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use arrow::{ datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}, @@ -14,6 +14,7 @@ use datafusion::{ error::{DataFusionError, Result as DataFusionResult}, execution::context::SessionState, logical_expr::{TableProviderFilterPushDown, TableType}, + optimizer::utils::{conjunction, split_conjunction}, physical_plan::{ expressions::{col as physical_col, PhysicalSortExpr}, filter::FilterExec, @@ -259,7 +260,8 @@ impl ChunkTableProvider { .chain(std::iter::once(chunk_order_field())) .collect(), )); - let dedup_sort_key = SortKey::from_columns(self.iox_schema().primary_key()); + let pk = self.iox_schema().primary_key(); + let dedup_sort_key = SortKey::from_columns(pk.iter().copied()); let mut chunks_by_delete_predicates = HashMap::<&[Arc], Vec>>::new(); @@ -315,10 +317,31 @@ impl ChunkTableProvider { .chain(negated_del_expr_val) .reduce(|a, b| a.and(b)) { - Arc::new(FilterExec::try_new( - df_physical_expr(plan.as_ref(), expr)?, - plan, - )?) + let maybe_expr = if !self.deduplication { + let dedup_cols = pk.into_iter().collect::>(); + conjunction( + split_conjunction(&expr) + .into_iter() + .filter(|expr| { + let Ok(expr_cols) = expr.to_columns() else {return false}; + expr_cols + .into_iter() + .all(|c| dedup_cols.contains(c.name.as_str())) + }) + .cloned(), + ) + } else { + Some(expr) + }; + + if let Some(expr) = maybe_expr { + Arc::new(FilterExec::try_new( + df_physical_expr(plan.as_ref(), expr)?, + plan, + )?) + } else { + plan + } } else { plan }; @@ -389,8 +412,10 @@ impl TableProvider for ChunkTableProvider { ) -> DataFusionResult { if influxdb_iox_pre_6098_planner() { Ok(TableProviderFilterPushDown::Inexact) - } else { + } else if self.deduplication { Ok(TableProviderFilterPushDown::Exact) + } else { + Ok(TableProviderFilterPushDown::Inexact) } } @@ -1488,12 +1513,18 @@ impl Deduplicater { #[cfg(test)] mod test { use super::*; - use crate::test::{format_execution_plan, raw_data, TestChunk}; + use crate::{ + test::{format_execution_plan, raw_data, TestChunk}, + QueryChunkMeta, + }; use arrow::datatypes::DataType; use arrow_util::{ assert_batches_eq, assert_batches_sorted_eq, test_util::equalize_batch_schemas, }; - use datafusion::physical_plan::displayable; + use datafusion::{ + physical_plan::displayable, + prelude::{col, lit}, + }; use datafusion_util::test_collect; use schema::{builder::SchemaBuilder, TIME_COLUMN_NAME}; use std::num::NonZeroU64; @@ -3664,6 +3695,450 @@ mod test { ); } + #[tokio::test] + async fn provider_scan_default() { + let table_name = "t"; + let chunk1 = Arc::new( + TestChunk::new(table_name) + .with_id(1) + .with_tag_column("tag1") + .with_tag_column("tag2") + .with_f64_field_column("field") + .with_time_column(), + ) as Arc; + let chunk2 = Arc::new( + TestChunk::new(table_name) + .with_id(2) + .with_dummy_parquet_file() + .with_tag_column("tag1") + .with_tag_column("tag2") + .with_f64_field_column("field") + .with_time_column(), + ) as Arc; + let schema = chunk1.schema().clone(); + let ctx = IOxSessionContext::with_testing(); + + let provider = ProviderBuilder::new(Arc::from(table_name), schema, ctx) + .add_chunk(Arc::clone(&chunk1)) + .add_chunk(Arc::clone(&chunk2)) + .build() + .unwrap(); + let state = provider.ctx.inner().state(); + + // simple plan + let plan = provider.scan(&state, None, &[], None).await.unwrap(); + insta::assert_yaml_snapshot!( + format_execution_plan(&plan), + @r###" + --- + - " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]" + - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" + - " UnionExec" + - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + "### + ); + + // projection + let plan = provider + .scan(&state, Some(&vec![1, 3]), &[], None) + .await + .unwrap(); + insta::assert_yaml_snapshot!( + format_execution_plan(&plan), + @r###" + --- + - " ProjectionExec: expr=[tag1@1 as tag1, time@3 as time]" + - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" + - " UnionExec" + - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + "### + ); + + // filters + let expr = vec![lit(false)]; + let expr_ref = expr.iter().collect::>(); + assert_eq!( + provider.supports_filters_pushdown(&expr_ref).unwrap(), + vec![TableProviderFilterPushDown::Exact] + ); + let plan = provider.scan(&state, None, &expr, None).await.unwrap(); + insta::assert_yaml_snapshot!( + format_execution_plan(&plan), + @r###" + --- + - " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]" + - " FilterExec: false" + - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" + - " UnionExec" + - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + "### + ); + + // limit pushdown is unimplemented at the moment + let plan = provider.scan(&state, None, &[], Some(1)).await.unwrap(); + insta::assert_yaml_snapshot!( + format_execution_plan(&plan), + @r###" + --- + - " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]" + - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" + - " UnionExec" + - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + "### + ); + } + + #[tokio::test] + async fn provider_scan_no_dedup() { + let table_name = "t"; + let chunk1 = Arc::new( + TestChunk::new(table_name) + .with_id(1) + .with_tag_column("tag1") + .with_tag_column("tag2") + .with_f64_field_column("field") + .with_time_column(), + ) as Arc; + let chunk2 = Arc::new( + TestChunk::new(table_name) + .with_id(2) + .with_dummy_parquet_file() + .with_tag_column("tag1") + .with_tag_column("tag2") + .with_f64_field_column("field") + .with_time_column(), + ) as Arc; + let schema = chunk1.schema().clone(); + let ctx = IOxSessionContext::with_testing(); + + let provider = ProviderBuilder::new(Arc::from(table_name), schema, ctx) + .add_chunk(Arc::clone(&chunk1)) + .add_chunk(Arc::clone(&chunk2)) + .with_enable_deduplication(false) + .build() + .unwrap(); + let state = provider.ctx.inner().state(); + + // simple plan + let plan = provider.scan(&state, None, &[], None).await.unwrap(); + insta::assert_yaml_snapshot!( + format_execution_plan(&plan), + @r###" + --- + - " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]" + - " UnionExec" + - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + "### + ); + + // projection + let plan = provider + .scan(&state, Some(&vec![1, 3]), &[], None) + .await + .unwrap(); + insta::assert_yaml_snapshot!( + format_execution_plan(&plan), + @r###" + --- + - " ProjectionExec: expr=[tag1@1 as tag1, time@3 as time]" + - " UnionExec" + - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + "### + ); + + // filters + // Expressions on fields are NOT pushed down because they cannot be pushed through de-dup. + let expr = vec![ + lit(false), + col("tag1").eq(lit("foo")), + col("field").eq(lit(1.0)), + ]; + let expr_ref = expr.iter().collect::>(); + assert_eq!( + provider.supports_filters_pushdown(&expr_ref).unwrap(), + vec![ + TableProviderFilterPushDown::Inexact, + TableProviderFilterPushDown::Inexact, + TableProviderFilterPushDown::Inexact + ] + ); + let plan = provider.scan(&state, None, &expr, None).await.unwrap(); + insta::assert_yaml_snapshot!( + format_execution_plan(&plan), + @r###" + --- + - " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]" + - " FilterExec: false AND tag1@1 = CAST(foo AS Dictionary(Int32, Utf8))" + - " UnionExec" + - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + "### + ); + + // limit pushdown is unimplemented at the moment + let plan = provider.scan(&state, None, &[], Some(1)).await.unwrap(); + insta::assert_yaml_snapshot!( + format_execution_plan(&plan), + @r###" + --- + - " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]" + - " UnionExec" + - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + "### + ); + } + + #[tokio::test] + async fn provider_scan_sorted() { + let table_name = "t"; + let chunk1 = Arc::new( + TestChunk::new(table_name) + .with_id(1) + .with_tag_column("tag1") + .with_tag_column("tag2") + .with_f64_field_column("field") + .with_time_column(), + ) as Arc; + let chunk2 = Arc::new( + TestChunk::new(table_name) + .with_id(2) + .with_dummy_parquet_file() + .with_tag_column("tag1") + .with_tag_column("tag2") + .with_f64_field_column("field") + .with_time_column(), + ) as Arc; + let schema = chunk1.schema().clone(); + let ctx = IOxSessionContext::with_testing(); + + let provider = ProviderBuilder::new(Arc::from(table_name), schema, ctx) + .add_chunk(Arc::clone(&chunk1)) + .add_chunk(Arc::clone(&chunk2)) + .with_output_sort_key(SortKey::from_columns(["tag2", "tag1"])) + .build() + .unwrap(); + let state = provider.ctx.inner().state(); + + // simple plan + let plan = provider.scan(&state, None, &[], None).await.unwrap(); + insta::assert_yaml_snapshot!( + format_execution_plan(&plan), + @r###" + --- + - " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]" + - " SortExec: expr=[tag2@2 ASC,tag1@1 ASC]" + - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" + - " UnionExec" + - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + "### + ); + + // projection + let plan = provider + .scan(&state, Some(&vec![1, 3]), &[], None) + .await + .unwrap(); + insta::assert_yaml_snapshot!( + format_execution_plan(&plan), + @r###" + --- + - " ProjectionExec: expr=[tag1@1 as tag1, time@3 as time]" + - " SortExec: expr=[tag2@2 ASC,tag1@1 ASC]" + - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" + - " UnionExec" + - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + "### + ); + + // filters + let expr = vec![lit(false)]; + let expr_ref = expr.iter().collect::>(); + assert_eq!( + provider.supports_filters_pushdown(&expr_ref).unwrap(), + vec![TableProviderFilterPushDown::Exact] + ); + let plan = provider.scan(&state, None, &expr, None).await.unwrap(); + insta::assert_yaml_snapshot!( + format_execution_plan(&plan), + @r###" + --- + - " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]" + - " SortExec: expr=[tag2@2 ASC,tag1@1 ASC]" + - " FilterExec: false" + - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" + - " UnionExec" + - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + "### + ); + + // limit pushdown is unimplemented at the moment + let plan = provider.scan(&state, None, &[], Some(1)).await.unwrap(); + insta::assert_yaml_snapshot!( + format_execution_plan(&plan), + @r###" + --- + - " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]" + - " SortExec: expr=[tag2@2 ASC,tag1@1 ASC]" + - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" + - " UnionExec" + - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + "### + ); + } + + #[tokio::test] + async fn provider_scan_retention() { + let table_name = "t"; + let pred = Arc::new(DeletePredicate::retention_delete_predicate(100)); + let chunk1 = Arc::new( + TestChunk::new(table_name) + .with_id(1) + .with_tag_column("tag1") + .with_tag_column("tag2") + .with_f64_field_column("field") + .with_time_column() + .with_delete_predicate(Arc::clone(&pred)), + ) as Arc; + let chunk2 = Arc::new( + TestChunk::new(table_name) + .with_id(2) + .with_dummy_parquet_file() + .with_tag_column("tag1") + .with_tag_column("tag2") + .with_f64_field_column("field") + .with_time_column() + .with_delete_predicate(Arc::clone(&pred)), + ) as Arc; + let schema = chunk1.schema().clone(); + let ctx = IOxSessionContext::with_testing(); + + let provider = ProviderBuilder::new(Arc::from(table_name), schema, ctx) + .add_chunk(Arc::clone(&chunk1)) + .add_chunk(Arc::clone(&chunk2)) + .build() + .unwrap(); + let state = provider.ctx.inner().state(); + + // simple plan + let plan = provider.scan(&state, None, &[], None).await.unwrap(); + insta::assert_yaml_snapshot!( + format_execution_plan(&plan), + @r###" + --- + - " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]" + - " FilterExec: time@3 < -9223372036854775808 OR time@3 > 100" + - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" + - " UnionExec" + - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + "### + ); + + // projection + let plan = provider + .scan(&state, Some(&vec![1, 3]), &[], None) + .await + .unwrap(); + insta::assert_yaml_snapshot!( + format_execution_plan(&plan), + @r###" + --- + - " ProjectionExec: expr=[tag1@1 as tag1, time@3 as time]" + - " FilterExec: time@3 < -9223372036854775808 OR time@3 > 100" + - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" + - " UnionExec" + - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + "### + ); + + // filters + let expr = vec![lit(false)]; + let expr_ref = expr.iter().collect::>(); + assert_eq!( + provider.supports_filters_pushdown(&expr_ref).unwrap(), + vec![TableProviderFilterPushDown::Exact] + ); + let plan = provider.scan(&state, None, &expr, None).await.unwrap(); + insta::assert_yaml_snapshot!( + format_execution_plan(&plan), + @r###" + --- + - " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]" + - " FilterExec: false AND (time@3 < -9223372036854775808 OR time@3 > 100)" + - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" + - " UnionExec" + - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + "### + ); + + // limit pushdown is unimplemented at the moment + let plan = provider.scan(&state, None, &[], Some(1)).await.unwrap(); + insta::assert_yaml_snapshot!( + format_execution_plan(&plan), + @r###" + --- + - " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]" + - " FilterExec: time@3 < -9223372036854775808 OR time@3 > 100" + - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" + - " UnionExec" + - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]" + "### + ); + } + + #[tokio::test] + async fn provider_scan_multiple_delete_predicates() { + let table_name = "t"; + let chunk1 = Arc::new( + TestChunk::new(table_name) + .with_id(1) + .with_tag_column("tag1") + .with_tag_column("tag2") + .with_f64_field_column("field") + .with_time_column() + .with_delete_predicate(Arc::new(DeletePredicate::retention_delete_predicate(100))), + ) as Arc; + let chunk2 = Arc::new( + TestChunk::new(table_name) + .with_id(2) + .with_dummy_parquet_file() + .with_tag_column("tag1") + .with_tag_column("tag2") + .with_f64_field_column("field") + .with_time_column() + .with_delete_predicate(Arc::new(DeletePredicate::retention_delete_predicate(200))), + ) as Arc; + let schema = chunk1.schema().clone(); + let ctx = IOxSessionContext::with_testing(); + + let provider = ProviderBuilder::new(Arc::from(table_name), schema, ctx) + .add_chunk(Arc::clone(&chunk1)) + .add_chunk(Arc::clone(&chunk2)) + .build() + .unwrap(); + let state = provider.ctx.inner().state(); + + let err = provider.scan(&state, None, &[], None).await.unwrap_err(); + assert_eq!( + err.to_string(), + "External error: expected at most 1 delete predicate set, got 2" + ); + } + fn chunk_ids(group: &[Arc]) -> String { let ids = group .iter() diff --git a/iox_query/src/test.rs b/iox_query/src/test.rs index 1acc235b69..abe8a81bf6 100644 --- a/iox_query/src/test.rs +++ b/iox_query/src/test.rs @@ -442,6 +442,11 @@ impl TestChunk { } } + pub fn with_delete_predicate(mut self, pred: Arc) -> Self { + self.delete_predicates.push(pred); + self + } + pub fn with_order(self, order: i64) -> Self { Self { order: ChunkOrder::new(order),