refactor: remove old query planning code (#7449)

Closes #7406.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-04-06 18:05:08 +02:00 committed by GitHub
parent 612f2451ee
commit 5f43f2a719
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 164 additions and 3744 deletions

1
Cargo.lock generated
View File

@ -3054,7 +3054,6 @@ dependencies = [
"itertools",
"object_store",
"observability_deps",
"once_cell",
"parking_lot 0.12.1",
"parquet_file",
"predicate",

View File

@ -350,10 +350,9 @@ impl CompactPlanBuilder {
let (small_cutoff_bytes, large_cutoff_bytes) =
cutoff_bytes(max_desired_file_size_bytes, percentage_max_file_size);
let ctx = exec.new_context(ExecutorType::Reorg);
let plan = if total_size <= small_cutoff_bytes {
// Compact everything into one file
ReorgPlanner::new(ctx.child_ctx("ReorgPlanner"))
ReorgPlanner::new()
.compact_plan(
Arc::from(partition.table.name.clone()),
&merged_schema,
@ -381,7 +380,7 @@ impl CompactPlanBuilder {
if split_times.is_empty() || (split_times.len() == 1 && split_times[0] == max_time) {
// The split times might not have actually split anything, so in this case, compact
// everything into one file
ReorgPlanner::new(ctx.child_ctx("ReorgPlanner"))
ReorgPlanner::new()
.compact_plan(
Arc::from(partition.table.name.clone()),
&merged_schema,
@ -391,7 +390,7 @@ impl CompactPlanBuilder {
.context(CompactLogicalPlanSnafu)?
} else {
// split compact query plan
ReorgPlanner::new(ctx.child_ctx("ReorgPlanner"))
ReorgPlanner::new()
.split_plan(
Arc::from(partition.table.name.clone()),
&merged_schema,
@ -532,9 +531,8 @@ impl CompactPlanBuilder {
.expect("no partition sort key in catalog")
.filter_to(&merged_schema.primary_key(), partition_id.get());
let ctx = exec.new_context(ExecutorType::Reorg);
// Compact everything into one file
let plan = ReorgPlanner::new(ctx.child_ctx("ReorgPlanner"))
let plan = ReorgPlanner::new()
.compact_plan(
Arc::from(partition.table.name.clone()),
&merged_schema,

View File

@ -59,7 +59,7 @@ impl DataFusionPlanner for V1DataFusionPlanner {
.expect("no partition sort key in catalog")
.filter_to(&merged_schema.primary_key(), partition.partition_id.get());
ReorgPlanner::new(ctx.child_ctx("ReorgPlanner"))
ReorgPlanner::new()
.compact_plan(
Arc::from(partition.table.name.clone()),
&merged_schema,
@ -84,7 +84,7 @@ impl DataFusionPlanner for V1DataFusionPlanner {
.expect("no partition sort key in catalog")
.filter_to(&merged_schema.primary_key(), partition.partition_id.get());
ReorgPlanner::new(ctx.child_ctx("ReorgPlanner"))
ReorgPlanner::new()
.split_plan(
Arc::from(partition.table.name.clone()),
&merged_schema,

View File

@ -137,7 +137,7 @@ pub(crate) async fn compact(
) -> Result<SendableRecordBatchStream> {
// Build logical plan for compaction
let ctx = executor.new_context(ExecutorType::Reorg);
let logical_plan = ReorgPlanner::new(ctx.child_ctx("ReorgPlanner"))
let logical_plan = ReorgPlanner::new()
.compact_plan(
table_name.into(),
data.schema(),

View File

@ -81,7 +81,7 @@ pub(super) async fn compact_persisting_batch(
// Build logical plan for compaction
let ctx = executor.new_context(ExecutorType::Reorg);
let logical_plan = ReorgPlanner::new(ctx.child_ctx("ReorgPlanner"))
let logical_plan = ReorgPlanner::new()
.compact_plan(
table_name.into(),
batch.schema(),

View File

@ -29,7 +29,6 @@ indexmap = { version = "1.9", features = ["std"] }
itertools = "0.10.5"
object_store = "0.5.6"
observability_deps = { path = "../observability_deps" }
once_cell = "1"
parking_lot = "0.12"
parquet_file = { path = "../parquet_file" }
query_functions = { path = "../query_functions"}

View File

@ -16,7 +16,7 @@ mod test {
use schema::{merge::SchemaMerger, sort::SortKey, Schema};
use crate::{
exec::{split::StreamSplitExec, Executor, ExecutorType, IOxSessionContext},
exec::{split::StreamSplitExec, Executor, ExecutorType},
frontend::reorg::ReorgPlanner,
provider::{DeduplicateExec, RecordBatchesExec},
test::{format_execution_plan, TestChunk},
@ -62,10 +62,9 @@ mod test {
test_helpers::maybe_start_logging();
// Create 2 overlapped chunks
let (schema, chunks) = get_test_overlapped_chunks();
let ctx = IOxSessionContext::with_testing();
// Build a logical plan with deduplication
let scan_plan = ScanPlanBuilder::new(Arc::from("t"), &schema, ctx.child_ctx("scan_plan"))
let scan_plan = ScanPlanBuilder::new(Arc::from("t"), &schema)
.with_chunks(chunks)
.build()
.unwrap();
@ -123,10 +122,9 @@ mod test {
test_helpers::maybe_start_logging();
// Create 2 overlapped chunks
let (schema, chunks) = get_test_chunks();
let ctx = IOxSessionContext::with_testing();
// Build a logical plan without deduplication
let scan_plan = ScanPlanBuilder::new(Arc::from("t"), &schema, ctx.child_ctx("scan_plan"))
let scan_plan = ScanPlanBuilder::new(Arc::from("t"), &schema)
.with_chunks(chunks)
// force it to not deduplicate
.enable_deduplication(false)
@ -188,10 +186,9 @@ mod test {
// Create 2 overlapped chunks
let (schema, chunks) = get_test_chunks();
let sort_key = SortKey::from_columns(vec!["time", "tag1"]);
let ctx = IOxSessionContext::with_testing();
// Build a logical plan without deduplication but sort
let scan_plan = ScanPlanBuilder::new(Arc::from("t"), &schema, ctx.child_ctx("scan_plan"))
let scan_plan = ScanPlanBuilder::new(Arc::from("t"), &schema)
.with_chunks(chunks)
// force it to not deduplicate
.enable_deduplication(false)
@ -243,7 +240,7 @@ mod test {
let sort_key = SortKey::from_columns(vec!["time", "tag1"]);
// Use a split plan as it has StreamSplitExec, DeduplicateExec and IOxReadFilternode
let split_plan = ReorgPlanner::new(IOxSessionContext::with_testing())
let split_plan = ReorgPlanner::new()
.split_plan(Arc::from("t"), &schema, chunks, sort_key, vec![1000])
.expect("created compact plan");

View File

@ -10,7 +10,6 @@ use schema::{sort::SortKey, Schema};
use snafu::{ResultExt, Snafu};
use crate::{
exec::IOxSessionContext,
provider::{ChunkTableProvider, ProviderBuilder},
util::MissingColumnsToNull,
QueryChunk,
@ -86,7 +85,6 @@ impl ScanPlan {
#[derive(Debug)]
pub struct ScanPlanBuilder<'a> {
ctx: IOxSessionContext,
table_name: Arc<str>,
/// The schema of the resulting table (any chunks that don't have
/// all the necessary columns will be extended appropriately)
@ -100,9 +98,8 @@ pub struct ScanPlanBuilder<'a> {
}
impl<'a> ScanPlanBuilder<'a> {
pub fn new(table_name: Arc<str>, table_schema: &'a Schema, ctx: IOxSessionContext) -> Self {
pub fn new(table_name: Arc<str>, table_schema: &'a Schema) -> Self {
Self {
ctx,
table_name,
table_schema,
chunks: vec![],
@ -144,7 +141,6 @@ impl<'a> ScanPlanBuilder<'a> {
/// Creates a `ScanPlan` from the specified chunks
pub fn build(self) -> Result<ScanPlan> {
let Self {
ctx,
table_name,
chunks,
output_sort_key,
@ -156,12 +152,8 @@ impl<'a> ScanPlanBuilder<'a> {
assert!(!chunks.is_empty(), "no chunks provided");
// Prepare the plan for the table
let mut builder = ProviderBuilder::new(
Arc::clone(&table_name),
table_schema.clone(),
ctx.child_ctx("provider_builder"),
)
.with_enable_deduplication(deduplication);
let mut builder = ProviderBuilder::new(Arc::clone(&table_name), table_schema.clone())
.with_enable_deduplication(deduplication);
if let Some(output_sort_key) = output_sort_key {
// Tell the scan of this provider to sort its output on the given sort_key

View File

@ -9,10 +9,7 @@ use datafusion::{
use observability_deps::tracing::debug;
use schema::{sort::SortKey, Schema, TIME_COLUMN_NAME};
use crate::{
exec::{make_stream_split, IOxSessionContext},
QueryChunk,
};
use crate::{exec::make_stream_split, QueryChunk};
use snafu::{ResultExt, Snafu};
use super::common::ScanPlanBuilder;
@ -52,14 +49,12 @@ impl From<datafusion::error::DataFusionError> for Error {
/// Planner for physically rearranging chunk data. This planner
/// creates COMPACT and SPLIT plans for use in the database lifecycle manager
#[derive(Debug)]
pub struct ReorgPlanner {
ctx: IOxSessionContext,
}
#[derive(Debug, Default)]
pub struct ReorgPlanner {}
impl ReorgPlanner {
pub fn new(ctx: IOxSessionContext) -> Self {
Self { ctx }
pub fn new() -> Self {
Self::default()
}
/// Creates an execution plan for the COMPACT operations which does the following:
@ -84,12 +79,11 @@ impl ReorgPlanner {
where
I: IntoIterator<Item = Arc<dyn QueryChunk>>,
{
let scan_plan =
ScanPlanBuilder::new(table_name, schema, self.ctx.child_ctx("compact_plan"))
.with_chunks(chunks)
.with_output_sort_key(output_sort_key)
.build()
.context(BuildingScanSnafu)?;
let scan_plan = ScanPlanBuilder::new(table_name, schema)
.with_chunks(chunks)
.with_output_sort_key(output_sort_key)
.build()
.context(BuildingScanSnafu)?;
let plan = scan_plan.plan_builder.build()?;
@ -180,7 +174,7 @@ impl ReorgPlanner {
panic!("Split plan does not accept empty split_times");
}
let scan_plan = ScanPlanBuilder::new(table_name, schema, self.ctx.child_ctx("split_plan"))
let scan_plan = ScanPlanBuilder::new(table_name, schema)
.with_chunks(chunks)
.with_output_sort_key(output_sort_key)
.build()
@ -349,7 +343,7 @@ mod test {
.with_col_opts(TIME_COLUMN_NAME, false, true)
.build();
let compact_plan = ReorgPlanner::new(IOxSessionContext::with_testing())
let compact_plan = ReorgPlanner::new()
.compact_plan(Arc::from("t"), &schema, chunks, sort_key)
.expect("created compact plan");
@ -386,7 +380,7 @@ mod test {
.with_col_opts(TIME_COLUMN_NAME, false, false)
.build();
let compact_plan = ReorgPlanner::new(IOxSessionContext::with_testing())
let compact_plan = ReorgPlanner::new()
.compact_plan(Arc::from("t"), &schema, chunks, sort_key)
.expect("created compact plan");
@ -454,7 +448,7 @@ mod test {
.build();
// split on 1000 should have timestamps 1000, 5000, and 7000
let split_plan = ReorgPlanner::new(IOxSessionContext::with_testing())
let split_plan = ReorgPlanner::new()
.split_plan(Arc::from("t"), &schema, chunks, sort_key, vec![1000])
.expect("created compact plan");
@ -535,7 +529,7 @@ mod test {
.build();
// split on 1000 and 7000
let split_plan = ReorgPlanner::new(IOxSessionContext::with_testing())
let split_plan = ReorgPlanner::new()
.split_plan(Arc::from("t"), &schema, chunks, sort_key, vec![1000, 7000])
.expect("created compact plan");
@ -628,7 +622,7 @@ mod test {
.build();
// split on 1000 and 7000
let _split_plan = ReorgPlanner::new(IOxSessionContext::with_testing())
let _split_plan = ReorgPlanner::new()
.split_plan(Arc::from("t"), &schema, chunks, sort_key, vec![]) // reason of panic: empty split_times
.expect("created compact plan");
}
@ -647,7 +641,7 @@ mod test {
.build();
// split on 1000 and 7000
let _split_plan = ReorgPlanner::new(IOxSessionContext::with_testing())
let _split_plan = ReorgPlanner::new()
.split_plan(Arc::from("t"), &schema, chunks, sort_key, vec![1000, 500]) // reason of panic: split_times not in ascending order
.expect("created compact plan");
}

View File

@ -426,25 +426,6 @@ fn compute_sort_key(summaries: impl Iterator<Item = Arc<TableSummary>>) -> SortK
key
}
/// Should we use the old query path prior to [#6098]?
///
/// This is done to enable the new code path bit-by-bit in production. Also see [k8s-idpe#17927]
///
///
/// [#6098]: https://github.com/influxdata/influxdb_iox/issues/6098
/// [k8s-idpe#17927]: https://github.com/influxdata/k8s-idpe/pull/17927
pub fn influxdb_iox_pre_6098_planner() -> bool {
use once_cell::sync::OnceCell;
static STATE: OnceCell<bool> = OnceCell::new();
*STATE.get_or_init(|| {
std::env::var("INFLUXDB_IOX_PRE_6098_PLANNER")
.ok()
.as_deref()
== Some("1")
})
}
// Note: I would like to compile this module only in the 'test' cfg,
// but when I do so then other modules can not find them. For example:
//

View File

@ -177,7 +177,6 @@ mod tests {
prelude::{col, lit},
scalar::ScalarValue,
};
use predicate::Predicate;
use schema::{merge::SchemaMerger, sort::SortKeyBuilder, SchemaBuilder, TIME_COLUMN_NAME};
use super::*;
@ -264,20 +263,8 @@ mod tests {
let chunk1 = Arc::new(chunk(1)) as Arc<dyn QueryChunk>;
let schema = chunk1.schema().as_arrow();
let plan = UnionExec::new(vec![
chunks_to_physical_nodes(
&schema,
Some(&sort_key1),
vec![Arc::clone(&chunk1)],
Predicate::default(),
1,
),
chunks_to_physical_nodes(
&schema,
Some(&sort_key2),
vec![chunk1],
Predicate::default(),
1,
),
chunks_to_physical_nodes(&schema, Some(&sort_key1), vec![Arc::clone(&chunk1)], 1),
chunks_to_physical_nodes(&schema, Some(&sort_key2), vec![chunk1], 1),
]);
assert!(extract_chunks(&plan).is_none());
}
@ -286,13 +273,7 @@ mod tests {
fn test_stop_at_other_node_types() {
let chunk1 = chunk(1);
let schema = chunk1.schema().as_arrow();
let plan = chunks_to_physical_nodes(
&schema,
None,
vec![Arc::new(chunk1)],
Predicate::default(),
2,
);
let plan = chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk1)], 2);
let plan = FilterExec::try_new(
df_physical_expr(plan.as_ref(), col("tag1").eq(lit("foo"))).unwrap(),
plan,
@ -333,13 +314,7 @@ mod tests {
fn test_parquet_with_predicate_fails() {
let chunk = chunk(1).with_dummy_parquet_file();
let schema = chunk.schema().as_arrow();
let plan = chunks_to_physical_nodes(
&schema,
None,
vec![Arc::new(chunk)],
Predicate::default(),
2,
);
let plan = chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk)], 2);
let plan = plan
.transform_down(&|plan| {
if let Some(exec) = plan.as_any().downcast_ref::<ParquetExec>() {
@ -362,13 +337,7 @@ mod tests {
chunks: Vec<Arc<dyn QueryChunk>>,
output_sort_key: Option<SortKey>,
) {
let plan = chunks_to_physical_nodes(
&schema,
output_sort_key.as_ref(),
chunks.clone(),
Predicate::default(),
2,
);
let plan = chunks_to_physical_nodes(&schema, output_sort_key.as_ref(), chunks.clone(), 2);
let (schema2, chunks2, output_sort_key2) =
extract_chunks(plan.as_ref()).expect("data found");
assert_eq!(schema, schema2);

View File

@ -7,7 +7,6 @@ use datafusion::{
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{union::UnionExec, ExecutionPlan},
};
use predicate::Predicate;
use crate::{
physical_optimizer::chunk_extraction::extract_chunks, provider::chunks_to_physical_nodes,
@ -54,7 +53,6 @@ impl PhysicalOptimizerRule for CombineChunks {
&schema,
output_sort_key.as_ref(),
chunks,
Predicate::new(),
config.execution.target_partitions,
);
let Some(union_of_chunks) = union_of_chunks.as_any().downcast_ref::<UnionExec>() else {
@ -98,18 +96,11 @@ mod tests {
let chunk5 = TestChunk::new("table").with_id(5).with_dummy_parquet_file();
let schema = chunk1.schema().as_arrow();
let plan = Arc::new(UnionExec::new(vec![
chunks_to_physical_nodes(
&schema,
None,
vec![Arc::new(chunk1), Arc::new(chunk2)],
Predicate::new(),
2,
),
chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk1), Arc::new(chunk2)], 2),
chunks_to_physical_nodes(
&schema,
None,
vec![Arc::new(chunk3), Arc::new(chunk4), Arc::new(chunk5)],
Predicate::new(),
2,
),
]));
@ -144,18 +135,12 @@ mod tests {
let chunk3 = TestChunk::new("table").with_id(1).with_dummy_parquet_file();
let schema = chunk1.schema().as_arrow();
let plan = Arc::new(UnionExec::new(vec![
chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk1)], Predicate::new(), 2),
chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk2)], Predicate::new(), 2),
chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk1)], 2),
chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk2)], 2),
Arc::new(
FilterExec::try_new(
Arc::new(Literal::new(ScalarValue::from(false))),
chunks_to_physical_nodes(
&schema,
None,
vec![Arc::new(chunk3)],
Predicate::new(),
2,
),
chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk3)], 2),
)
.unwrap(),
),
@ -189,7 +174,7 @@ mod tests {
fn test_no_chunks() {
let chunk1 = TestChunk::new("table").with_id(1);
let schema = chunk1.schema().as_arrow();
let plan = chunks_to_physical_nodes(&schema, None, vec![], Predicate::new(), 2);
let plan = chunks_to_physical_nodes(&schema, None, vec![], 2);
let opt = CombineChunks::default();
let mut config = ConfigOptions::default();
config.execution.target_partitions = 2;
@ -213,13 +198,7 @@ mod tests {
let plan = Arc::new(UnionExec::new(vec![Arc::new(
FilterExec::try_new(
Arc::new(Literal::new(ScalarValue::from(false))),
chunks_to_physical_nodes(
&schema,
None,
vec![Arc::new(chunk1)],
Predicate::new(),
2,
),
chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk1)], 2),
)
.unwrap(),
)]));

View File

@ -7,7 +7,6 @@ use datafusion::{
physical_optimizer::PhysicalOptimizerRule,
physical_plan::ExecutionPlan,
};
use predicate::Predicate;
use schema::{sort::SortKeyBuilder, TIME_COLUMN_NAME};
use crate::{
@ -69,7 +68,6 @@ impl PhysicalOptimizerRule for DedupNullColumns {
&schema,
(!sort_key.is_empty()).then_some(&sort_key),
chunks,
Predicate::new(),
config.execution.target_partitions,
);

View File

@ -9,7 +9,6 @@ use datafusion::{
physical_plan::ExecutionPlan,
};
use indexmap::IndexSet;
use predicate::Predicate;
use schema::{sort::SortKeyBuilder, TIME_COLUMN_NAME};
use crate::{
@ -131,7 +130,6 @@ impl PhysicalOptimizerRule for DedupSortOrder {
&schema,
(!quorum_sort_key.is_empty()).then_some(&quorum_sort_key),
chunks,
Predicate::new(),
config.execution.target_partitions,
);

View File

@ -10,7 +10,6 @@ use datafusion::{
};
use hashbrown::HashMap;
use observability_deps::tracing::warn;
use predicate::Predicate;
use crate::{
config::IoxConfigExt,
@ -86,7 +85,6 @@ impl PhysicalOptimizerRule for PartitionSplit {
&schema,
output_sort_key.as_ref(),
chunks,
Predicate::new(),
config.execution.target_partitions,
),
dedup_exec.sort_keys().to_vec(),

View File

@ -7,7 +7,6 @@ use datafusion::{
physical_optimizer::PhysicalOptimizerRule,
physical_plan::ExecutionPlan,
};
use predicate::Predicate;
use crate::{
physical_optimizer::chunk_extraction::extract_chunks,
@ -40,7 +39,6 @@ impl PhysicalOptimizerRule for RemoveDedup {
&schema,
output_sort_key.as_ref(),
chunks,
Predicate::new(),
config.execution.target_partitions,
)));
}

View File

@ -2,7 +2,6 @@ use std::sync::Arc;
use arrow::datatypes::Schema as ArrowSchema;
use datafusion::physical_plan::ExecutionPlan;
use predicate::Predicate;
use schema::Schema;
use crate::{
@ -46,7 +45,7 @@ fn dedup_plan_impl(
} else {
schema.as_arrow()
};
let plan = chunks_to_physical_nodes(&arrow_schema, None, chunks, Predicate::new(), 2);
let plan = chunks_to_physical_nodes(&arrow_schema, None, chunks, 2);
let sort_key = schema::sort::SortKey::from_columns(schema.primary_key());
let sort_exprs = arrow_sort_key_exprs(&sort_key, &plan.schema());

View File

@ -8,7 +8,6 @@ use datafusion::{
physical_plan::{union::UnionExec, ExecutionPlan},
};
use observability_deps::tracing::warn;
use predicate::Predicate;
use crate::{
config::IoxConfigExt,
@ -73,7 +72,6 @@ impl PhysicalOptimizerRule for TimeSplit {
&schema,
output_sort_key.as_ref(),
chunks,
Predicate::new(),
config.execution.target_partitions,
),
dedup_exec.sort_keys().to_vec(),

View File

@ -2,8 +2,6 @@ use std::sync::Arc;
use datafusion::{execution::context::SessionState, physical_optimizer::PhysicalOptimizerRule};
use crate::influxdb_iox_pre_6098_planner;
use self::{
combine_chunks::CombineChunks,
dedup::{
@ -32,30 +30,25 @@ mod test_util;
/// Register IOx-specific [`PhysicalOptimizerRule`]s with the SessionContext
pub fn register_iox_physical_optimizers(state: SessionState) -> SessionState {
if influxdb_iox_pre_6098_planner() {
// keep default
state
} else {
// prepend IOx-specific rules to DataFusion builtins
let mut optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(PartitionSplit::default()),
Arc::new(TimeSplit::default()),
Arc::new(RemoveDedup::default()),
Arc::new(CombineChunks::default()),
Arc::new(DedupNullColumns::default()),
Arc::new(DedupSortOrder::default()),
Arc::new(PredicatePushdown::default()),
Arc::new(ProjectionPushdown::default()),
Arc::new(ParquetSortness::default()) as _,
Arc::new(NestedUnion::default()),
Arc::new(OneUnion::default()),
];
optimizers.append(&mut state.physical_optimizers().to_vec());
optimizers.extend([
Arc::new(SortPushdown::default()) as _,
Arc::new(RedundantSort::default()) as _,
]);
// prepend IOx-specific rules to DataFusion builtins
let mut optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(PartitionSplit::default()),
Arc::new(TimeSplit::default()),
Arc::new(RemoveDedup::default()),
Arc::new(CombineChunks::default()),
Arc::new(DedupNullColumns::default()),
Arc::new(DedupSortOrder::default()),
Arc::new(PredicatePushdown::default()),
Arc::new(ProjectionPushdown::default()),
Arc::new(ParquetSortness::default()) as _,
Arc::new(NestedUnion::default()),
Arc::new(OneUnion::default()),
];
optimizers.append(&mut state.physical_optimizers().to_vec());
optimizers.extend([
Arc::new(SortPushdown::default()) as _,
Arc::new(RedundantSort::default()) as _,
]);
state.with_physical_optimizer_rules(optimizers)
}
state.with_physical_optimizer_rules(optimizers)
}

File diff suppressed because it is too large Load Diff

View File

@ -7,7 +7,7 @@ use crate::{
use arrow::datatypes::{DataType, Schema as ArrowSchema, SchemaRef};
use datafusion::{
datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl},
physical_expr::{execution_props::ExecutionProps, PhysicalSortExpr},
physical_expr::PhysicalSortExpr,
physical_plan::{
empty::EmptyExec,
expressions::Column,
@ -17,10 +17,7 @@ use datafusion::{
},
scalar::ScalarValue,
};
use datafusion_util::create_physical_expr_from_schema;
use object_store::ObjectMeta;
use observability_deps::tracing::warn;
use predicate::Predicate;
use schema::{sort::SortKey, Schema};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
@ -144,7 +141,6 @@ pub fn chunks_to_physical_nodes(
schema: &SchemaRef,
output_sort_key: Option<&SortKey>,
chunks: Vec<Arc<dyn QueryChunk>>,
predicate: Predicate,
target_partitions: usize,
) -> Arc<dyn ExecutionPlan> {
if chunks.is_empty() {
@ -240,18 +236,6 @@ pub fn chunks_to_physical_nodes(
// Tell datafusion about the sort key, if any
let output_ordering = sort_key.map(|sort_key| arrow_sort_key_exprs(&sort_key, schema));
let props = ExecutionProps::new();
let filter_expr = predicate.filter_expr()
.and_then(|filter_expr| {
match create_physical_expr_from_schema(&props, &filter_expr, schema) {
Ok(f) => Some(f),
Err(e) => {
warn!(%e, ?filter_expr, "Error creating physical filter expression, can not push down");
None
}
}
});
let (table_partition_cols, file_schema, output_ordering) = if has_chunk_order_col {
let table_partition_cols = vec![(CHUNK_ORDER_COLUMN_NAME.to_owned(), DataType::Int64)];
let file_schema = Arc::new(ArrowSchema::new(
@ -327,7 +311,7 @@ pub fn chunks_to_physical_nodes(
};
let meta_size_hint = None;
let parquet_exec = ParquetExec::new(base_config, filter_expr, meta_size_hint);
let parquet_exec = ParquetExec::new(base_config, None, meta_size_hint);
output_nodes.push(Arc::new(parquet_exec));
}
@ -459,7 +443,7 @@ mod tests {
#[test]
fn test_chunks_to_physical_nodes_empty() {
let schema = TestChunk::new("table").schema().as_arrow();
let plan = chunks_to_physical_nodes(&schema, None, vec![], Predicate::new(), 2);
let plan = chunks_to_physical_nodes(&schema, None, vec![], 2);
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
@ -473,8 +457,7 @@ mod tests {
fn test_chunks_to_physical_nodes_recordbatch() {
let chunk = TestChunk::new("table");
let schema = chunk.schema().as_arrow();
let plan =
chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk)], Predicate::new(), 2);
let plan = chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk)], 2);
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
@ -489,8 +472,7 @@ mod tests {
fn test_chunks_to_physical_nodes_parquet_one_file() {
let chunk = TestChunk::new("table").with_dummy_parquet_file();
let schema = chunk.schema().as_arrow();
let plan =
chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk)], Predicate::new(), 2);
let plan = chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk)], 2);
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
@ -511,7 +493,6 @@ mod tests {
&schema,
None,
vec![Arc::new(chunk1), Arc::new(chunk2), Arc::new(chunk3)],
Predicate::new(),
2,
);
insta::assert_yaml_snapshot!(
@ -533,13 +514,8 @@ mod tests {
.with_id(1)
.with_dummy_parquet_file_and_store("iox2://");
let schema = chunk1.schema().as_arrow();
let plan = chunks_to_physical_nodes(
&schema,
None,
vec![Arc::new(chunk1), Arc::new(chunk2)],
Predicate::new(),
2,
);
let plan =
chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk1), Arc::new(chunk2)], 2);
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
@ -556,13 +532,8 @@ mod tests {
let chunk1 = TestChunk::new("table").with_dummy_parquet_file();
let chunk2 = TestChunk::new("table");
let schema = chunk1.schema().as_arrow();
let plan = chunks_to_physical_nodes(
&schema,
None,
vec![Arc::new(chunk1), Arc::new(chunk2)],
Predicate::new(),
2,
);
let plan =
chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk1), Arc::new(chunk2)], 2);
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
@ -590,13 +561,8 @@ mod tests {
.chain(std::iter::once(chunk_order_field()))
.collect(),
));
let plan = chunks_to_physical_nodes(
&schema,
None,
vec![Arc::new(chunk1), Arc::new(chunk2)],
Predicate::new(),
2,
);
let plan =
chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk1), Arc::new(chunk2)], 2);
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"

View File

@ -309,16 +309,8 @@ impl InfluxRpcPlanner {
table_name: table_name.as_ref(),
})?;
let mut ctx = ctx.child_ctx("table name plan");
ctx.set_metadata("table", table_name.to_string());
let plan = Self::table_name_plan(
ctx,
Arc::clone(table_name),
&schema,
predicate,
chunks,
)?;
let plan =
Self::table_name_plan(Arc::clone(table_name), &schema, predicate, chunks)?;
builder = builder.append_other(plan.into());
}
}
@ -475,10 +467,7 @@ impl InfluxRpcPlanner {
table_name: table_name.as_ref(),
})?;
let mut ctx = ctx.child_ctx("tag_keys_plan");
ctx.set_metadata("table", table_name.to_string());
let plan = self.tag_keys_plan(ctx, table_name, &schema, predicate, chunks_full)?;
let plan = self.tag_keys_plan(table_name, &schema, predicate, chunks_full)?;
if let Some(plan) = plan {
builder = builder.append_other(plan)
@ -647,10 +636,7 @@ impl InfluxRpcPlanner {
table_name: table_name.as_ref(),
})?;
let mut ctx = ctx.child_ctx("scan_and_filter planning");
ctx.set_metadata("table", table_name.to_string());
let scan_and_filter = ScanPlanBuilder::new(Arc::clone(table_name), &schema, ctx)
let scan_and_filter = ScanPlanBuilder::new(Arc::clone(table_name), &schema)
.with_chunks(chunks_full)
.with_predicate(predicate)
.build()?;
@ -736,14 +722,8 @@ impl InfluxRpcPlanner {
namespace,
&table_predicates_need_chunks,
ctx,
|ctx, table_name, predicate, chunks, schema| {
Self::field_columns_plan(
ctx.child_ctx("field_columns plan"),
Arc::from(table_name),
schema,
predicate,
chunks,
)
|table_name, predicate, chunks, schema| {
Self::field_columns_plan(Arc::from(table_name), schema, predicate, chunks)
},
)
.await?;
@ -788,14 +768,8 @@ impl InfluxRpcPlanner {
namespace,
&table_predicates,
ctx,
|ctx, table_name, predicate, chunks, schema| {
Self::read_filter_plan(
ctx.child_ctx("read_filter plan"),
table_name,
schema,
predicate,
chunks,
)
|table_name, predicate, chunks, schema| {
Self::read_filter_plan(table_name, schema, predicate, chunks)
},
)
.await?;
@ -861,7 +835,7 @@ impl InfluxRpcPlanner {
namespace,
&table_predicates,
ctx,
|ctx, table_name, predicate, chunks, schema| {
|table_name, predicate, chunks, schema| {
// check group_columns for unknown columns
let known_tags_vec = schema
.tags_iter()
@ -890,21 +864,10 @@ impl InfluxRpcPlanner {
}
match agg {
Aggregate::None => Self::read_filter_plan(
ctx.child_ctx("read_filter plan"),
table_name,
schema,
predicate,
chunks,
),
_ => Self::read_group_plan(
ctx.child_ctx("read_group plan"),
table_name,
schema,
predicate,
agg,
chunks,
),
Aggregate::None => {
Self::read_filter_plan(table_name, schema, predicate, chunks)
}
_ => Self::read_group_plan(table_name, schema, predicate, agg, chunks),
}
},
)
@ -940,16 +903,9 @@ impl InfluxRpcPlanner {
namespace,
&table_predicates,
ctx,
|ctx, table_name, predicate, chunks, schema| {
|table_name, predicate, chunks, schema| {
Self::read_window_aggregate_plan(
ctx.child_ctx("read_window_aggregate plan"),
table_name,
schema,
predicate,
agg,
every,
offset,
chunks,
table_name, schema, predicate, agg, every, offset, chunks,
)
},
)
@ -970,20 +926,15 @@ impl InfluxRpcPlanner {
/// ```
fn tag_keys_plan(
&self,
ctx: IOxSessionContext,
table_name: &str,
schema: &Schema,
predicate: &Predicate,
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<Option<StringSetPlan>> {
let scan_and_filter = ScanPlanBuilder::new(
Arc::from(table_name),
schema,
ctx.child_ctx("scan_and_filter planning"),
)
.with_predicate(predicate)
.with_chunks(chunks)
.build()?;
let scan_and_filter = ScanPlanBuilder::new(Arc::from(table_name), schema)
.with_predicate(predicate)
.with_chunks(chunks)
.build()?;
// now, select only the tag columns
let select_exprs = scan_and_filter
@ -1036,20 +987,15 @@ impl InfluxRpcPlanner {
/// Scan
/// ```
fn field_columns_plan(
ctx: IOxSessionContext,
table_name: Arc<str>,
schema: &Schema,
predicate: &Predicate,
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<LogicalPlan> {
let scan_and_filter = ScanPlanBuilder::new(
table_name,
schema,
ctx.child_ctx("scan_and_filter planning"),
)
.with_predicate(predicate)
.with_chunks(chunks)
.build()?;
let scan_and_filter = ScanPlanBuilder::new(table_name, schema)
.with_predicate(predicate)
.with_chunks(chunks)
.build()?;
// Selection of only fields and time
let select_exprs = scan_and_filter
@ -1092,21 +1038,16 @@ impl InfluxRpcPlanner {
/// Scan
/// ```
fn table_name_plan(
ctx: IOxSessionContext,
table_name: Arc<str>,
schema: &Schema,
predicate: &Predicate,
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<LogicalPlan> {
debug!(%table_name, "Creating table_name full plan");
let scan_and_filter = ScanPlanBuilder::new(
Arc::clone(&table_name),
schema,
ctx.child_ctx("scan_and_filter planning"),
)
.with_predicate(predicate)
.with_chunks(chunks)
.build()?;
let scan_and_filter = ScanPlanBuilder::new(Arc::clone(&table_name), schema)
.with_predicate(predicate)
.with_chunks(chunks)
.build()?;
// Select only fields requested
let select_exprs: Vec<_> = filtered_fields_iter(scan_and_filter.schema(), predicate)
@ -1137,20 +1078,15 @@ impl InfluxRpcPlanner {
/// Filter(predicate)
/// Scan
fn read_filter_plan(
ctx: IOxSessionContext,
table_name: &str,
schema: &Schema,
predicate: &Predicate,
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<SeriesSetPlan> {
let scan_and_filter = ScanPlanBuilder::new(
Arc::from(table_name),
schema,
ctx.child_ctx("scan_and_filter planning"),
)
.with_predicate(predicate)
.with_chunks(chunks)
.build()?;
let scan_and_filter = ScanPlanBuilder::new(Arc::from(table_name), schema)
.with_predicate(predicate)
.with_chunks(chunks)
.build()?;
let schema = scan_and_filter.provider.iox_schema();
@ -1245,21 +1181,16 @@ impl InfluxRpcPlanner {
/// Filter(predicate)
/// Scan
fn read_group_plan(
ctx: IOxSessionContext,
table_name: &str,
schema: &Schema,
predicate: &Predicate,
agg: Aggregate,
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<SeriesSetPlan> {
let scan_and_filter = ScanPlanBuilder::new(
Arc::from(table_name),
schema,
ctx.child_ctx("scan_and_filter planning"),
)
.with_predicate(predicate)
.with_chunks(chunks)
.build()?;
let scan_and_filter = ScanPlanBuilder::new(Arc::from(table_name), schema)
.with_predicate(predicate)
.with_chunks(chunks)
.build()?;
// order the tag columns so that the group keys come first (we
// will group and
@ -1354,7 +1285,6 @@ impl InfluxRpcPlanner {
/// Scan
#[allow(clippy::too_many_arguments)]
fn read_window_aggregate_plan(
ctx: IOxSessionContext,
table_name: &str,
schema: &Schema,
predicate: &Predicate,
@ -1363,14 +1293,10 @@ impl InfluxRpcPlanner {
offset: WindowDuration,
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<SeriesSetPlan> {
let scan_and_filter = ScanPlanBuilder::new(
Arc::from(table_name),
schema,
ctx.child_ctx("scan_and_filter planning"),
)
.with_predicate(predicate)
.with_chunks(chunks)
.build()?;
let scan_and_filter = ScanPlanBuilder::new(Arc::from(table_name), schema)
.with_predicate(predicate)
.with_chunks(chunks)
.build()?;
let schema = scan_and_filter.provider.iox_schema();
@ -1562,13 +1488,7 @@ async fn create_plans<F, P>(
f: F,
) -> Result<Vec<P>>
where
F: for<'a> Fn(
&'a IOxSessionContext,
&'a str,
&'a Predicate,
Vec<Arc<dyn QueryChunk>>,
&'a Schema,
) -> Result<P>
F: for<'a> Fn(&'a str, &'a Predicate, Vec<Arc<dyn QueryChunk>>, &'a Schema) -> Result<P>
+ Clone
+ Send
+ Sync,
@ -1602,7 +1522,7 @@ where
table_name: table_name.as_ref(),
})?;
f(&ctx, table_name, predicate, chunks, &schema)
f(table_name, predicate, chunks, &schema)
}
})
.try_collect()

View File

@ -75,7 +75,6 @@ impl QuerierNamespace {
schema: cached_table.schema.clone(),
ingester_connection: ingester_connection.clone(),
chunk_adapter: Arc::clone(&chunk_adapter),
exec: Arc::clone(&exec),
prune_metrics: Arc::clone(&prune_metrics),
}));

View File

@ -8,7 +8,7 @@ use crate::{
use data_types::{ColumnId, DeletePredicate, NamespaceId, PartitionId, ShardIndex, TableId};
use datafusion::error::DataFusionError;
use futures::join;
use iox_query::{exec::Executor, provider, provider::ChunkPruner, QueryChunk};
use iox_query::{provider, provider::ChunkPruner, QueryChunk};
use observability_deps::tracing::{debug, trace};
use predicate::Predicate;
use schema::Schema;
@ -79,7 +79,6 @@ pub struct QuerierTableArgs {
pub schema: Schema,
pub ingester_connection: Option<Arc<dyn IngesterConnection>>,
pub chunk_adapter: Arc<ChunkAdapter>,
pub exec: Arc<Executor>,
pub prune_metrics: Arc<PruneMetrics>,
}
@ -114,9 +113,6 @@ pub struct QuerierTable {
/// Interface to create chunks for this table.
chunk_adapter: Arc<ChunkAdapter>,
/// Executor for queries.
exec: Arc<Executor>,
/// Metrics for chunk pruning.
prune_metrics: Arc<PruneMetrics>,
}
@ -134,7 +130,6 @@ impl QuerierTable {
schema,
ingester_connection,
chunk_adapter,
exec,
prune_metrics,
} = args;
@ -148,7 +143,6 @@ impl QuerierTable {
schema,
ingester_connection,
chunk_adapter,
exec,
prune_metrics,
}
}

View File

@ -11,8 +11,7 @@ use datafusion::{
prelude::Expr,
};
use iox_query::{
exec::{ExecutorType, SessionContextIOxExt},
influxdb_iox_pre_6098_planner,
exec::SessionContextIOxExt,
provider::{ChunkPruner, Error as ProviderError, ProviderBuilder},
pruning::{prune_chunks, NotPrunedReason, PruningObserver},
QueryChunk,
@ -51,13 +50,9 @@ impl TableProvider for QuerierTable {
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
// build provider out of all chunks
// TODO: push down some predicates to catalog
let iox_ctx = self.exec.new_context_from_df(ExecutorType::Query, ctx);
let mut builder = ProviderBuilder::new(
Arc::clone(self.table_name()),
self.schema().clone(),
iox_ctx,
);
let mut builder =
ProviderBuilder::new(Arc::clone(self.table_name()), self.schema().clone());
let pruning_predicate = filters
.iter()
@ -88,13 +83,7 @@ impl TableProvider for QuerierTable {
&self,
_filter: &Expr,
) -> Result<TableProviderFilterPushDown, DataFusionError> {
if influxdb_iox_pre_6098_planner() {
// we may apply filtering (via pruning) but can not guarantee
// that the filter catches all row during scan
Ok(TableProviderFilterPushDown::Inexact)
} else {
Ok(TableProviderFilterPushDown::Exact)
}
Ok(TableProviderFilterPushDown::Exact)
}
}

View File

@ -59,7 +59,6 @@ pub async fn querier_table(catalog: &Arc<TestCatalog>, table: &Arc<TestTable>) -
schema,
ingester_connection: Some(create_ingester_connection_for_testing()),
chunk_adapter,
exec: catalog.exec(),
prune_metrics: Arc::new(PruneMetrics::new(&catalog.metric_registry())),
})
}