test: `ChunkTableProvider::scan` + fix "not dedup" (#7448)

1. Add loads of tests for `ChunkTableProvider::scan` (= the naive phys.
   plan before running any phys. optimizers)
2. Fix interaction of "no de-dup" and predicate pushdown. This might
   be used by the ingester at some point and I would like to have this
   correct before someone silently introduces a bug by pushing field
   predicates into the ingester.

This is mostly prep-work for #7406 so I know that test coverage is
sufficient.
pull/24376/head
Marco Neumann 2023-04-06 10:39:53 +02:00 committed by GitHub
parent 9f5fec42b8
commit 30b1878171
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 489 additions and 9 deletions

View File

@ -3,7 +3,7 @@
use async_trait::async_trait;
use data_types::DeletePredicate;
use hashbrown::HashMap;
use std::sync::Arc;
use std::{collections::HashSet, sync::Arc};
use arrow::{
datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef},
@ -14,6 +14,7 @@ use datafusion::{
error::{DataFusionError, Result as DataFusionResult},
execution::context::SessionState,
logical_expr::{TableProviderFilterPushDown, TableType},
optimizer::utils::{conjunction, split_conjunction},
physical_plan::{
expressions::{col as physical_col, PhysicalSortExpr},
filter::FilterExec,
@ -259,7 +260,8 @@ impl ChunkTableProvider {
.chain(std::iter::once(chunk_order_field()))
.collect(),
));
let dedup_sort_key = SortKey::from_columns(self.iox_schema().primary_key());
let pk = self.iox_schema().primary_key();
let dedup_sort_key = SortKey::from_columns(pk.iter().copied());
let mut chunks_by_delete_predicates =
HashMap::<&[Arc<DeletePredicate>], Vec<Arc<dyn QueryChunk>>>::new();
@ -315,10 +317,31 @@ impl ChunkTableProvider {
.chain(negated_del_expr_val)
.reduce(|a, b| a.and(b))
{
Arc::new(FilterExec::try_new(
df_physical_expr(plan.as_ref(), expr)?,
plan,
)?)
let maybe_expr = if !self.deduplication {
let dedup_cols = pk.into_iter().collect::<HashSet<_>>();
conjunction(
split_conjunction(&expr)
.into_iter()
.filter(|expr| {
let Ok(expr_cols) = expr.to_columns() else {return false};
expr_cols
.into_iter()
.all(|c| dedup_cols.contains(c.name.as_str()))
})
.cloned(),
)
} else {
Some(expr)
};
if let Some(expr) = maybe_expr {
Arc::new(FilterExec::try_new(
df_physical_expr(plan.as_ref(), expr)?,
plan,
)?)
} else {
plan
}
} else {
plan
};
@ -389,8 +412,10 @@ impl TableProvider for ChunkTableProvider {
) -> DataFusionResult<TableProviderFilterPushDown> {
if influxdb_iox_pre_6098_planner() {
Ok(TableProviderFilterPushDown::Inexact)
} else {
} else if self.deduplication {
Ok(TableProviderFilterPushDown::Exact)
} else {
Ok(TableProviderFilterPushDown::Inexact)
}
}
@ -1488,12 +1513,18 @@ impl Deduplicater {
#[cfg(test)]
mod test {
use super::*;
use crate::test::{format_execution_plan, raw_data, TestChunk};
use crate::{
test::{format_execution_plan, raw_data, TestChunk},
QueryChunkMeta,
};
use arrow::datatypes::DataType;
use arrow_util::{
assert_batches_eq, assert_batches_sorted_eq, test_util::equalize_batch_schemas,
};
use datafusion::physical_plan::displayable;
use datafusion::{
physical_plan::displayable,
prelude::{col, lit},
};
use datafusion_util::test_collect;
use schema::{builder::SchemaBuilder, TIME_COLUMN_NAME};
use std::num::NonZeroU64;
@ -3664,6 +3695,450 @@ mod test {
);
}
#[tokio::test]
async fn provider_scan_default() {
let table_name = "t";
let chunk1 = Arc::new(
TestChunk::new(table_name)
.with_id(1)
.with_tag_column("tag1")
.with_tag_column("tag2")
.with_f64_field_column("field")
.with_time_column(),
) as Arc<dyn QueryChunk>;
let chunk2 = Arc::new(
TestChunk::new(table_name)
.with_id(2)
.with_dummy_parquet_file()
.with_tag_column("tag1")
.with_tag_column("tag2")
.with_f64_field_column("field")
.with_time_column(),
) as Arc<dyn QueryChunk>;
let schema = chunk1.schema().clone();
let ctx = IOxSessionContext::with_testing();
let provider = ProviderBuilder::new(Arc::from(table_name), schema, ctx)
.add_chunk(Arc::clone(&chunk1))
.add_chunk(Arc::clone(&chunk2))
.build()
.unwrap();
let state = provider.ctx.inner().state();
// simple plan
let plan = provider.scan(&state, None, &[], None).await.unwrap();
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
---
- " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
"###
);
// projection
let plan = provider
.scan(&state, Some(&vec![1, 3]), &[], None)
.await
.unwrap();
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
---
- " ProjectionExec: expr=[tag1@1 as tag1, time@3 as time]"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
"###
);
// filters
let expr = vec![lit(false)];
let expr_ref = expr.iter().collect::<Vec<_>>();
assert_eq!(
provider.supports_filters_pushdown(&expr_ref).unwrap(),
vec![TableProviderFilterPushDown::Exact]
);
let plan = provider.scan(&state, None, &expr, None).await.unwrap();
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
---
- " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]"
- " FilterExec: false"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
"###
);
// limit pushdown is unimplemented at the moment
let plan = provider.scan(&state, None, &[], Some(1)).await.unwrap();
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
---
- " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
"###
);
}
#[tokio::test]
async fn provider_scan_no_dedup() {
let table_name = "t";
let chunk1 = Arc::new(
TestChunk::new(table_name)
.with_id(1)
.with_tag_column("tag1")
.with_tag_column("tag2")
.with_f64_field_column("field")
.with_time_column(),
) as Arc<dyn QueryChunk>;
let chunk2 = Arc::new(
TestChunk::new(table_name)
.with_id(2)
.with_dummy_parquet_file()
.with_tag_column("tag1")
.with_tag_column("tag2")
.with_f64_field_column("field")
.with_time_column(),
) as Arc<dyn QueryChunk>;
let schema = chunk1.schema().clone();
let ctx = IOxSessionContext::with_testing();
let provider = ProviderBuilder::new(Arc::from(table_name), schema, ctx)
.add_chunk(Arc::clone(&chunk1))
.add_chunk(Arc::clone(&chunk2))
.with_enable_deduplication(false)
.build()
.unwrap();
let state = provider.ctx.inner().state();
// simple plan
let plan = provider.scan(&state, None, &[], None).await.unwrap();
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
---
- " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
"###
);
// projection
let plan = provider
.scan(&state, Some(&vec![1, 3]), &[], None)
.await
.unwrap();
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
---
- " ProjectionExec: expr=[tag1@1 as tag1, time@3 as time]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
"###
);
// filters
// Expressions on fields are NOT pushed down because they cannot be pushed through de-dup.
let expr = vec![
lit(false),
col("tag1").eq(lit("foo")),
col("field").eq(lit(1.0)),
];
let expr_ref = expr.iter().collect::<Vec<_>>();
assert_eq!(
provider.supports_filters_pushdown(&expr_ref).unwrap(),
vec![
TableProviderFilterPushDown::Inexact,
TableProviderFilterPushDown::Inexact,
TableProviderFilterPushDown::Inexact
]
);
let plan = provider.scan(&state, None, &expr, None).await.unwrap();
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
---
- " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]"
- " FilterExec: false AND tag1@1 = CAST(foo AS Dictionary(Int32, Utf8))"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
"###
);
// limit pushdown is unimplemented at the moment
let plan = provider.scan(&state, None, &[], Some(1)).await.unwrap();
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
---
- " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
"###
);
}
#[tokio::test]
async fn provider_scan_sorted() {
let table_name = "t";
let chunk1 = Arc::new(
TestChunk::new(table_name)
.with_id(1)
.with_tag_column("tag1")
.with_tag_column("tag2")
.with_f64_field_column("field")
.with_time_column(),
) as Arc<dyn QueryChunk>;
let chunk2 = Arc::new(
TestChunk::new(table_name)
.with_id(2)
.with_dummy_parquet_file()
.with_tag_column("tag1")
.with_tag_column("tag2")
.with_f64_field_column("field")
.with_time_column(),
) as Arc<dyn QueryChunk>;
let schema = chunk1.schema().clone();
let ctx = IOxSessionContext::with_testing();
let provider = ProviderBuilder::new(Arc::from(table_name), schema, ctx)
.add_chunk(Arc::clone(&chunk1))
.add_chunk(Arc::clone(&chunk2))
.with_output_sort_key(SortKey::from_columns(["tag2", "tag1"]))
.build()
.unwrap();
let state = provider.ctx.inner().state();
// simple plan
let plan = provider.scan(&state, None, &[], None).await.unwrap();
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
---
- " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]"
- " SortExec: expr=[tag2@2 ASC,tag1@1 ASC]"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
"###
);
// projection
let plan = provider
.scan(&state, Some(&vec![1, 3]), &[], None)
.await
.unwrap();
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
---
- " ProjectionExec: expr=[tag1@1 as tag1, time@3 as time]"
- " SortExec: expr=[tag2@2 ASC,tag1@1 ASC]"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
"###
);
// filters
let expr = vec![lit(false)];
let expr_ref = expr.iter().collect::<Vec<_>>();
assert_eq!(
provider.supports_filters_pushdown(&expr_ref).unwrap(),
vec![TableProviderFilterPushDown::Exact]
);
let plan = provider.scan(&state, None, &expr, None).await.unwrap();
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
---
- " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]"
- " SortExec: expr=[tag2@2 ASC,tag1@1 ASC]"
- " FilterExec: false"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
"###
);
// limit pushdown is unimplemented at the moment
let plan = provider.scan(&state, None, &[], Some(1)).await.unwrap();
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
---
- " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]"
- " SortExec: expr=[tag2@2 ASC,tag1@1 ASC]"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
"###
);
}
#[tokio::test]
async fn provider_scan_retention() {
let table_name = "t";
let pred = Arc::new(DeletePredicate::retention_delete_predicate(100));
let chunk1 = Arc::new(
TestChunk::new(table_name)
.with_id(1)
.with_tag_column("tag1")
.with_tag_column("tag2")
.with_f64_field_column("field")
.with_time_column()
.with_delete_predicate(Arc::clone(&pred)),
) as Arc<dyn QueryChunk>;
let chunk2 = Arc::new(
TestChunk::new(table_name)
.with_id(2)
.with_dummy_parquet_file()
.with_tag_column("tag1")
.with_tag_column("tag2")
.with_f64_field_column("field")
.with_time_column()
.with_delete_predicate(Arc::clone(&pred)),
) as Arc<dyn QueryChunk>;
let schema = chunk1.schema().clone();
let ctx = IOxSessionContext::with_testing();
let provider = ProviderBuilder::new(Arc::from(table_name), schema, ctx)
.add_chunk(Arc::clone(&chunk1))
.add_chunk(Arc::clone(&chunk2))
.build()
.unwrap();
let state = provider.ctx.inner().state();
// simple plan
let plan = provider.scan(&state, None, &[], None).await.unwrap();
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
---
- " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]"
- " FilterExec: time@3 < -9223372036854775808 OR time@3 > 100"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
"###
);
// projection
let plan = provider
.scan(&state, Some(&vec![1, 3]), &[], None)
.await
.unwrap();
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
---
- " ProjectionExec: expr=[tag1@1 as tag1, time@3 as time]"
- " FilterExec: time@3 < -9223372036854775808 OR time@3 > 100"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
"###
);
// filters
let expr = vec![lit(false)];
let expr_ref = expr.iter().collect::<Vec<_>>();
assert_eq!(
provider.supports_filters_pushdown(&expr_ref).unwrap(),
vec![TableProviderFilterPushDown::Exact]
);
let plan = provider.scan(&state, None, &expr, None).await.unwrap();
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
---
- " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]"
- " FilterExec: false AND (time@3 < -9223372036854775808 OR time@3 > 100)"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
"###
);
// limit pushdown is unimplemented at the moment
let plan = provider.scan(&state, None, &[], Some(1)).await.unwrap();
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
---
- " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]"
- " FilterExec: time@3 < -9223372036854775808 OR time@3 > 100"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " ParquetExec: limit=None, partitions={1 group: [[2.parquet]]}, output_ordering=[__chunk_order@4 ASC], projection=[field, tag1, tag2, time, __chunk_order]"
"###
);
}
#[tokio::test]
async fn provider_scan_multiple_delete_predicates() {
let table_name = "t";
let chunk1 = Arc::new(
TestChunk::new(table_name)
.with_id(1)
.with_tag_column("tag1")
.with_tag_column("tag2")
.with_f64_field_column("field")
.with_time_column()
.with_delete_predicate(Arc::new(DeletePredicate::retention_delete_predicate(100))),
) as Arc<dyn QueryChunk>;
let chunk2 = Arc::new(
TestChunk::new(table_name)
.with_id(2)
.with_dummy_parquet_file()
.with_tag_column("tag1")
.with_tag_column("tag2")
.with_f64_field_column("field")
.with_time_column()
.with_delete_predicate(Arc::new(DeletePredicate::retention_delete_predicate(200))),
) as Arc<dyn QueryChunk>;
let schema = chunk1.schema().clone();
let ctx = IOxSessionContext::with_testing();
let provider = ProviderBuilder::new(Arc::from(table_name), schema, ctx)
.add_chunk(Arc::clone(&chunk1))
.add_chunk(Arc::clone(&chunk2))
.build()
.unwrap();
let state = provider.ctx.inner().state();
let err = provider.scan(&state, None, &[], None).await.unwrap_err();
assert_eq!(
err.to_string(),
"External error: expected at most 1 delete predicate set, got 2"
);
}
fn chunk_ids(group: &[Arc<dyn QueryChunk>]) -> String {
let ids = group
.iter()

View File

@ -442,6 +442,11 @@ impl TestChunk {
}
}
pub fn with_delete_predicate(mut self, pred: Arc<DeletePredicate>) -> Self {
self.delete_predicates.push(pred);
self
}
pub fn with_order(self, order: i64) -> Self {
Self {
order: ChunkOrder::new(order),