From fd8a87484ef79300245caa7490cf36b3c5ee2ea2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 8 Jun 2021 13:23:22 -0400 Subject: [PATCH 1/3] feat: Hook up chunk grouping into provider --- query/src/lib.rs | 4 + query/src/provider.rs | 264 +++++++++++++++++++++++++++-------------- query/src/pruning.rs | 15 +++ query/src/test.rs | 99 +++++++++++++++- server/src/db/chunk.rs | 7 ++ 5 files changed, 299 insertions(+), 90 deletions(-) diff --git a/query/src/lib.rs b/query/src/lib.rs index 4573988a04..197c96ca4a 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -66,6 +66,10 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync { /// Returns the name of the table stored in this chunk fn table_name(&self) -> &str; + /// Returns true if the chunk may contain a duplicate "primary + /// key" within itself + fn may_contain_pk_duplicates(&self) -> bool; + /// Returns the result of applying the `predicate` to the chunk /// using an efficient, but inexact method, based on metadata. /// diff --git a/query/src/provider.rs b/query/src/provider.rs index aa3978827d..7f5867ea6f 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -2,7 +2,7 @@ use std::sync::Arc; -use arrow::datatypes::SchemaRef as ArrowSchemaRef; +use arrow::{datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError}; use datafusion::{ datasource::{ datasource::{Statistics, TableProviderFilterPushDown}, @@ -16,6 +16,7 @@ use internal_types::schema::{builder::SchemaMerger, Schema}; use observability_deps::tracing::debug; use crate::{ + duplicate::group_potential_duplicates, predicate::{Predicate, PredicateBuilder}, util::project_schema, PartitionChunk, @@ -58,9 +59,26 @@ pub enum Error { InternalPushdownPredicate { source: datafusion::error::DataFusionError, }, + + #[snafu(display("Internal error: Can not group chunks '{}'", source,))] + InternalChunkGrouping { source: crate::duplicate::Error }, } pub type Result = std::result::Result; +impl From for ArrowError { + // Wrap an error into an arrow error + fn from(e: Error) -> Self { + Self::ExternalError(Box::new(e)) + } +} + +impl From for DataFusionError { + // Wrap an error into a datafusion error + fn from(e: Error) -> Self { + Self::ArrowError(e.into()) + } +} + /// Something that can prune chunks based on their metadata pub trait ChunkPruner: Sync + Send + std::fmt::Debug { /// prune `chunks`, if possible, based on predicate. @@ -249,7 +267,7 @@ impl TableProvider for ChunkTableProvider { scan_schema, chunks, predicate, - ); + )?; Ok(plan) } @@ -303,38 +321,38 @@ impl Deduplicater { /// The final UnionExec on top is to union the streams below. If there is only one stream, UnionExec /// will not be added into the plan. /// ```text - /// ┌─────────────────┐ - /// │ UnionExec │ - /// │ │ - /// └─────────────────┘ - /// ▲ - /// │ - /// ┌──────────────────────┴───────────┬─────────────────────┐ - /// │ │ │ - /// │ │ │ + /// ┌─────────────────┐ + /// │ UnionExec │ + /// │ │ + /// └─────────────────┘ + /// ▲ + /// │ + /// ┌──────────────────────┴───────────┬─────────────────────┐ + /// │ │ │ + /// │ │ │ /// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ /// │ DeduplicateExec │ │ DeduplicateExec │ │IOxReadFilterNode│ /// └─────────────────┘ └─────────────────┘ │ (Chunk 4) │ /// ▲ ▲ └─────────────────┘ - /// │ │ - /// ┌───────────────────────┐ │ - /// │SortPreservingMergeExec│ │ - /// └───────────────────────┘ │ + /// │ │ + /// ┌───────────────────────┐ │ + /// │SortPreservingMergeExec│ │ + /// └───────────────────────┘ │ /// ▲ | /// │ | - /// ┌───────────┴───────────┐ │ - /// │ │ │ - /// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ - /// │ SortExec │ │ SortExec │ │ SortExec │ - /// │ (optional) │ │ (optional) │ │ (optional) │ - /// └─────────────────┘ └─────────────────┘ └─────────────────┘ - /// ▲ ▲ ▲ - /// │ │ │ - /// │ │ │ - /// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ - /// │IOxReadFilterNode│ │IOxReadFilterNode│ │IOxReadFilterNode│ - /// │ (Chunk 1) │ │ (Chunk 2) │ │ (Chunk 3) │ - /// └─────────────────┘ └─────────────────┘ └─────────────────┘ + /// ┌───────────┴───────────┐ │ + /// │ │ │ + /// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ + /// │ SortExec │ │ SortExec │ │ SortExec │ + /// │ (optional) │ │ (optional) │ │ (optional) │ + /// └─────────────────┘ └─────────────────┘ └─────────────────┘ + /// ▲ ▲ ▲ + /// │ │ │ + /// │ │ │ + /// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ + /// │IOxReadFilterNode│ │IOxReadFilterNode│ │IOxReadFilterNode│ + /// │ (Chunk 1) │ │ (Chunk 2) │ │ (Chunk 3) │ + /// └─────────────────┘ └─────────────────┘ └─────────────────┘ ///``` fn build_scan_plan( @@ -343,9 +361,18 @@ impl Deduplicater { schema: ArrowSchemaRef, chunks: Vec>, predicate: Predicate, - ) -> Arc { - //finding overlapped chunks and put them into the right group - self.split_overlapped_chunks(chunks.to_vec()); + ) -> Result> { + // findi overlapped chunks and put them into the right group + self.split_overlapped_chunks(chunks.to_vec())?; + + // TEMP until the rest of this module's code is complete: + // merge all plans into the same + self.no_duplicates_chunks + .append(&mut self.in_chunk_duplicates_chunks); + for mut group in self.overlapped_chunks_set.iter_mut() { + self.no_duplicates_chunks.append(&mut group); + } + self.overlapped_chunks_set.clear(); // Building plans let mut plans = vec![]; @@ -396,22 +423,32 @@ impl Deduplicater { // There are still plan, add UnionExec if !plans.is_empty() { // final_plan = union_plan - panic!("Unexpected error: There should be only one output for scan plan"); + panic!("Unexpected error: There should be only one output for scan plan, but there were: {:#?}", plans); } - final_plan + Ok(final_plan) } /// discover overlaps and split them into three groups: /// 1. vector of vector of overlapped chunks /// 2. vector of non-overlapped chunks, each have duplicates in itself /// 3. vectors of non-overlapped chunks without duplicates - fn split_overlapped_chunks(&mut self, chunks: Vec>) { - // TODO: need to discover overlaps and split them - // The current behavior is just like neither overlaps nor having duplicates in its own chunk - //self.overlapped_chunks_set = vec![]; - //self.in_chunk_duplicates_chunks = vec![]; - self.no_duplicates_chunks.append(&mut chunks.to_vec()); + fn split_overlapped_chunks(&mut self, chunks: Vec>) -> Result<()> { + // Find all groups based on statstics + let groups = group_potential_duplicates(chunks).context(InternalChunkGrouping)?; + + for mut group in groups { + if group.len() == 1 { + if group[0].may_contain_pk_duplicates() { + self.in_chunk_duplicates_chunks.append(&mut group); + } else { + self.no_duplicates_chunks.append(&mut group); + } + } else { + self.overlapped_chunks_set.push(group) + } + } + Ok(()) } /// Return true if all chunks are neither overlap nor has duplicates in itself @@ -422,29 +459,29 @@ impl Deduplicater { /// Return deduplicate plan for the given overlapped chunks /// The plan will look like this /// ```text - /// ┌─────────────────┐ - /// │ DeduplicateExec │ - /// └─────────────────┘ - /// ▲ - /// │ - /// ┌───────────────────────┐ - /// │SortPreservingMergeExec│ - /// └───────────────────────┘ - /// ▲ - /// │ - /// ┌───────────┴───────────┐ - /// │ │ - /// ┌─────────────────┐ ┌─────────────────┐ - /// │ SortExec │ ... │ SortExec │ - /// │ (optional) │ │ (optional) │ - /// └─────────────────┘ └─────────────────┘ - /// ▲ ▲ - /// │ ... │ - /// │ │ - /// ┌─────────────────┐ ┌─────────────────┐ - /// │IOxReadFilterNode│ │IOxReadFilterNode│ - /// │ (Chunk 1) │ ... │ (Chunk n) │ - /// └─────────────────┘ └─────────────────┘ + /// ┌─────────────────┐ + /// │ DeduplicateExec │ + /// └─────────────────┘ + /// ▲ + /// │ + /// ┌───────────────────────┐ + /// │SortPreservingMergeExec│ + /// └───────────────────────┘ + /// ▲ + /// │ + /// ┌───────────┴───────────┐ + /// │ │ + /// ┌─────────────────┐ ┌─────────────────┐ + /// │ SortExec │ ... │ SortExec │ + /// │ (optional) │ │ (optional) │ + /// └─────────────────┘ └─────────────────┘ + /// ▲ ▲ + /// │ ... │ + /// │ │ + /// ┌─────────────────┐ ┌─────────────────┐ + /// │IOxReadFilterNode│ │IOxReadFilterNode│ + /// │ (Chunk 1) │ ... │ (Chunk n) │ + /// └─────────────────┘ └─────────────────┘ ///``` fn build_deduplicate_plan_for_overlapped_chunks( table_name: Arc, @@ -465,22 +502,22 @@ impl Deduplicater { /// Return deduplicate plan for a given chunk with duplicates /// The plan will look like this /// ```text - /// ┌─────────────────┐ - /// │ DeduplicateExec │ - /// └─────────────────┘ - /// ▲ - /// │ - /// ┌─────────────────┐ - /// │ SortExec │ - /// │ (optional) │ - /// └─────────────────┘ - /// ▲ - /// │ - /// │ - /// ┌─────────────────┐ - /// │IOxReadFilterNode│ - /// │ (Chunk) │ - /// └─────────────────┘ + /// ┌─────────────────┐ + /// │ DeduplicateExec │ + /// └─────────────────┘ + /// ▲ + /// │ + /// ┌─────────────────┐ + /// │ SortExec │ + /// │ (optional) │ + /// └─────────────────┘ + /// ▲ + /// │ + /// │ + /// ┌─────────────────┐ + /// │IOxReadFilterNode│ + /// │ (Chunk) │ + /// └─────────────────┘ ///``` fn build_deduplicate_plan_for_chunk_with_duplicates( table_name: Arc, @@ -500,10 +537,10 @@ impl Deduplicater { /// Return the simplest IOx scan plan of a given chunk which is IOxReadFilterNode /// ```text - /// ┌─────────────────┐ - /// │IOxReadFilterNode│ - /// │ (Chunk) │ - /// └─────────────────┘ + /// ┌─────────────────┐ + /// │IOxReadFilterNode│ + /// │ (Chunk) │ + /// └─────────────────┘ ///``` fn build_plan_for_non_duplicates_chunk( table_name: Arc, @@ -521,10 +558,10 @@ impl Deduplicater { /// Return the simplest IOx scan plan for many chunks which is IOxReadFilterNode /// ```text - /// ┌─────────────────┐ - /// │IOxReadFilterNode│ - /// │ (Chunk) │ - /// └─────────────────┘ + /// ┌─────────────────┐ + /// │IOxReadFilterNode│ + /// │ (Chunk) │ + /// └─────────────────┘ ///``` fn build_plans_for_non_duplicates_chunk( table_name: Arc, @@ -549,3 +586,58 @@ impl ChunkPruner for NoOpPruner { chunks } } + +#[cfg(test)] +mod test { + use crate::test::TestChunk; + + use super::*; + + #[test] + fn chunk_grouping() { + // This test just ensures that all the plumbing is connected + // for chunk grouping. The logic of the grouping is tested + // in the duplicate module + + // c1: no overlaps + let c1 = Arc::new(TestChunk::new(1).with_tag_column_with_stats("t", "tag1", "a", "b")); + + // c2: over lap with c3 + let c2 = Arc::new(TestChunk::new(2).with_tag_column_with_stats("t", "tag1", "c", "d")); + + // c3: overlap with c2 + let c3 = Arc::new(TestChunk::new(3).with_tag_column_with_stats("t", "tag1", "c", "d")); + + // c4: self overlap + let c4 = Arc::new( + TestChunk::new(4) + .with_tag_column_with_stats("t", "tag1", "e", "f") + .with_may_contain_pk_duplicates(true), + ); + + let mut deduplicator = Deduplicater::new(); + deduplicator + .split_overlapped_chunks(vec![c1, c2, c3, c4]) + .expect("split chunks"); + + assert_eq!( + chunk_group_ids(&deduplicator.overlapped_chunks_set), + vec!["Group 0: 2, 3"] + ); + assert_eq!(chunk_ids(&deduplicator.in_chunk_duplicates_chunks), "4"); + assert_eq!(chunk_ids(&deduplicator.no_duplicates_chunks), "1"); + } + + fn chunk_ids(group: &[Arc]) -> String { + let ids = group.iter().map(|c| c.id().to_string()).collect::>(); + ids.join(", ") + } + + fn chunk_group_ids(groups: &[Vec>]) -> Vec { + groups + .iter() + .enumerate() + .map(|(idx, group)| format!("Group {}: {}", idx, chunk_ids(group))) + .collect() + } +} diff --git a/query/src/pruning.rs b/query/src/pruning.rs index 850ae65700..4e49afa957 100644 --- a/query/src/pruning.rs +++ b/query/src/pruning.rs @@ -1,4 +1,6 @@ //! Implementation of statistics based pruning +use std::sync::Arc; + use arrow::{array::ArrayRef, datatypes::SchemaRef}; use data_types::partition_metadata::{ColumnSummary, Statistics, TableSummary}; use datafusion::{ @@ -20,6 +22,19 @@ pub trait Prunable: Sized { fn schema(&self) -> SchemaRef; } +impl

Prunable for Arc

+where + P: Prunable, +{ + fn summary(&self) -> &TableSummary { + self.as_ref().summary() + } + + fn schema(&self) -> SchemaRef { + self.as_ref().schema() + } +} + /// Something that cares to be notified when pruning of chunks occurs pub trait PruningObserver { type Observed; diff --git a/query/src/test.rs b/query/src/test.rs index 5348368bc5..067d0926b2 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -8,7 +8,10 @@ use arrow::{ datatypes::{DataType, Int32Type, TimeUnit}, record_batch::RecordBatch, }; -use data_types::{chunk_metadata::ChunkSummary, partition_metadata::TableSummary}; +use data_types::{ + chunk_metadata::ChunkSummary, + partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, +}; use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream}; use crate::{ @@ -20,7 +23,7 @@ use crate::{exec::Executor, pruning::Prunable}; use internal_types::{ schema::{ builder::{SchemaBuilder, SchemaMerger}, - Schema, + InfluxColumnType, Schema, }, selection::Selection, }; @@ -119,6 +122,9 @@ impl Database for TestDatabase { pub struct TestChunk { id: u32, + /// Set the flag if this chunk might contain duplicates + may_contain_pk_duplicates: bool, + /// A copy of the captured predicates passed predicates: Mutex>, @@ -136,6 +142,9 @@ pub struct TestChunk { /// Return value for apply_predicate, if desired predicate_match: Option, + + /// Return value for summary(), if desired + table_summary: Option, } impl TestChunk { @@ -173,6 +182,12 @@ impl TestChunk { self.with_tag_column(table_name, "dummy_col") } + /// Set the `may_contain_pk_duplicates` flag + pub fn with_may_contain_pk_duplicates(mut self, v: bool) -> Self { + self.may_contain_pk_duplicates = v; + self + } + /// Register an tag column with the test chunk pub fn with_tag_column( self, @@ -189,6 +204,38 @@ impl TestChunk { self.add_schema_to_table(table_name, new_column_schema) } + /// Register an tag column with the test chunk + pub fn with_tag_column_with_stats( + self, + table_name: impl Into, + column_name: impl Into, + min: &str, + max: &str, + ) -> Self { + let table_name = table_name.into(); + let column_name = column_name.into(); + + let mut new_self = self.with_tag_column(&table_name, &column_name); + + // Now, find the appropriate column summary and update the stats + let column_summary: &mut ColumnSummary = new_self + .table_summary + .as_mut() + .expect("had table summary") + .columns + .iter_mut() + .find(|c| &c.name == &column_name) + .expect("had column"); + + column_summary.stats = Statistics::String(StatValues { + min: Some(min.to_string()), + max: Some(max.to_string()), + ..Default::default() + }); + + new_self + } + /// Register a timetamp column with the test chunk pub fn with_time_column(self, table_name: impl Into) -> Self { let table_name = table_name.into(); @@ -226,7 +273,37 @@ impl TestChunk { if let Some(existing_name) = &self.table_name { assert_eq!(&table_name, existing_name); } - self.table_name = Some(table_name); + self.table_name = Some(table_name.clone()); + + // assume the new schema has exactly a single table + assert_eq!(new_column_schema.len(), 1); + let (col_type, new_field) = new_column_schema.field(0); + + let influxdb_type = col_type.map(|t| match t { + InfluxColumnType::Tag => InfluxDbType::Tag, + InfluxColumnType::Field(_) => InfluxDbType::Field, + InfluxColumnType::Timestamp => InfluxDbType::Timestamp, + }); + + let stats = match new_field.data_type() { + DataType::Boolean => Statistics::Bool(StatValues::default()), + DataType::Int64 => Statistics::I64(StatValues::default()), + DataType::UInt64 => Statistics::U64(StatValues::default()), + DataType::Utf8 => Statistics::String(StatValues::default()), + DataType::Dictionary(_, value_type) => { + assert!(matches!(**value_type, DataType::Utf8)); + Statistics::String(StatValues::default()) + } + DataType::Float64 => Statistics::String(StatValues::default()), + DataType::Timestamp(_, _) => Statistics::I64(StatValues::default()), + _ => panic!("Unsupported type in TestChunk: {:?}", new_field.data_type()), + }; + + let column_summary = ColumnSummary { + name: new_field.name().clone(), + influxdb_type, + stats, + }; let mut merger = SchemaMerger::new().merge(new_column_schema).unwrap(); @@ -238,6 +315,14 @@ impl TestChunk { let new_schema = merger.build().unwrap(); self.table_schema = Some(new_schema); + + let mut table_summary = self + .table_summary + .take() + .unwrap_or_else(|| TableSummary::new(table_name)); + table_summary.columns.push(column_summary); + self.table_summary = Some(table_summary); + self } @@ -320,6 +405,10 @@ impl PartitionChunk for TestChunk { self.table_name.as_deref().unwrap() } + fn may_contain_pk_duplicates(&self) -> bool { + self.may_contain_pk_duplicates + } + fn read_filter( &self, predicate: &Predicate, @@ -403,7 +492,9 @@ impl PartitionChunk for TestChunk { impl Prunable for TestChunk { fn summary(&self) -> &TableSummary { - unimplemented!(); + self.table_summary + .as_ref() + .expect("Table summary not configured for TestChunk") } fn schema(&self) -> arrow::datatypes::SchemaRef { diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index bcb571196b..6fe64ddcc8 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -217,6 +217,13 @@ impl PartitionChunk for DbChunk { self.table_name.as_ref() } + fn may_contain_pk_duplicates(&self) -> bool { + // Assume that the MUB can contain duplicates as it has the + // raw incoming stream of writes, but that all other types of + // chunks are deduplicated as part of creation + matches!(self.state, State::ReadBuffer { .. }) + } + fn apply_predicate(&self, predicate: &Predicate) -> Result { if !predicate.should_include_table(self.table_name().as_ref()) { return Ok(PredicateMatch::Zero); From b23c4e5210e5ed68e6537178079ea89a779ed373 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 8 Jun 2021 14:44:48 -0400 Subject: [PATCH 2/3] fix: clippy --- query/src/provider.rs | 2 +- query/src/test.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/query/src/provider.rs b/query/src/provider.rs index 7f5867ea6f..074a396e7f 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -369,7 +369,7 @@ impl Deduplicater { // merge all plans into the same self.no_duplicates_chunks .append(&mut self.in_chunk_duplicates_chunks); - for mut group in self.overlapped_chunks_set.iter_mut() { + for mut group in &mut self.overlapped_chunks_set { self.no_duplicates_chunks.append(&mut group); } self.overlapped_chunks_set.clear(); diff --git a/query/src/test.rs b/query/src/test.rs index 067d0926b2..ff5d50a14f 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -224,7 +224,7 @@ impl TestChunk { .expect("had table summary") .columns .iter_mut() - .find(|c| &c.name == &column_name) + .find(|c| c.name == column_name) .expect("had column"); column_summary.stats = Statistics::String(StatValues { From 666204d4a84d39258c2ee6580694ac8ad9dba885 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 8 Jun 2021 14:46:55 -0400 Subject: [PATCH 3/3] fix: remove whitespace changes --- query/src/provider.rs | 146 +++++++++++++++++++++--------------------- 1 file changed, 73 insertions(+), 73 deletions(-) diff --git a/query/src/provider.rs b/query/src/provider.rs index 074a396e7f..63490df170 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -321,38 +321,38 @@ impl Deduplicater { /// The final UnionExec on top is to union the streams below. If there is only one stream, UnionExec /// will not be added into the plan. /// ```text - /// ┌─────────────────┐ - /// │ UnionExec │ - /// │ │ - /// └─────────────────┘ - /// ▲ - /// │ - /// ┌──────────────────────┴───────────┬─────────────────────┐ - /// │ │ │ - /// │ │ │ + /// ┌─────────────────┐ + /// │ UnionExec │ + /// │ │ + /// └─────────────────┘ + /// ▲ + /// │ + /// ┌──────────────────────┴───────────┬─────────────────────┐ + /// │ │ │ + /// │ │ │ /// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ /// │ DeduplicateExec │ │ DeduplicateExec │ │IOxReadFilterNode│ /// └─────────────────┘ └─────────────────┘ │ (Chunk 4) │ /// ▲ ▲ └─────────────────┘ - /// │ │ - /// ┌───────────────────────┐ │ - /// │SortPreservingMergeExec│ │ - /// └───────────────────────┘ │ + /// │ │ + /// ┌───────────────────────┐ │ + /// │SortPreservingMergeExec│ │ + /// └───────────────────────┘ │ /// ▲ | /// │ | - /// ┌───────────┴───────────┐ │ - /// │ │ │ - /// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ - /// │ SortExec │ │ SortExec │ │ SortExec │ - /// │ (optional) │ │ (optional) │ │ (optional) │ - /// └─────────────────┘ └─────────────────┘ └─────────────────┘ - /// ▲ ▲ ▲ - /// │ │ │ - /// │ │ │ - /// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ - /// │IOxReadFilterNode│ │IOxReadFilterNode│ │IOxReadFilterNode│ - /// │ (Chunk 1) │ │ (Chunk 2) │ │ (Chunk 3) │ - /// └─────────────────┘ └─────────────────┘ └─────────────────┘ + /// ┌───────────┴───────────┐ │ + /// │ │ │ + /// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ + /// │ SortExec │ │ SortExec │ │ SortExec │ + /// │ (optional) │ │ (optional) │ │ (optional) │ + /// └─────────────────┘ └─────────────────┘ └─────────────────┘ + /// ▲ ▲ ▲ + /// │ │ │ + /// │ │ │ + /// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ + /// │IOxReadFilterNode│ │IOxReadFilterNode│ │IOxReadFilterNode│ + /// │ (Chunk 1) │ │ (Chunk 2) │ │ (Chunk 3) │ + /// └─────────────────┘ └─────────────────┘ └─────────────────┘ ///``` fn build_scan_plan( @@ -459,29 +459,29 @@ impl Deduplicater { /// Return deduplicate plan for the given overlapped chunks /// The plan will look like this /// ```text - /// ┌─────────────────┐ - /// │ DeduplicateExec │ - /// └─────────────────┘ - /// ▲ - /// │ - /// ┌───────────────────────┐ - /// │SortPreservingMergeExec│ - /// └───────────────────────┘ - /// ▲ - /// │ - /// ┌───────────┴───────────┐ - /// │ │ - /// ┌─────────────────┐ ┌─────────────────┐ - /// │ SortExec │ ... │ SortExec │ - /// │ (optional) │ │ (optional) │ - /// └─────────────────┘ └─────────────────┘ - /// ▲ ▲ - /// │ ... │ - /// │ │ - /// ┌─────────────────┐ ┌─────────────────┐ - /// │IOxReadFilterNode│ │IOxReadFilterNode│ - /// │ (Chunk 1) │ ... │ (Chunk n) │ - /// └─────────────────┘ └─────────────────┘ + /// ┌─────────────────┐ + /// │ DeduplicateExec │ + /// └─────────────────┘ + /// ▲ + /// │ + /// ┌───────────────────────┐ + /// │SortPreservingMergeExec│ + /// └───────────────────────┘ + /// ▲ + /// │ + /// ┌───────────┴───────────┐ + /// │ │ + /// ┌─────────────────┐ ┌─────────────────┐ + /// │ SortExec │ ... │ SortExec │ + /// │ (optional) │ │ (optional) │ + /// └─────────────────┘ └─────────────────┘ + /// ▲ ▲ + /// │ ... │ + /// │ │ + /// ┌─────────────────┐ ┌─────────────────┐ + /// │IOxReadFilterNode│ │IOxReadFilterNode│ + /// │ (Chunk 1) │ ... │ (Chunk n) │ + /// └─────────────────┘ └─────────────────┘ ///``` fn build_deduplicate_plan_for_overlapped_chunks( table_name: Arc, @@ -502,22 +502,22 @@ impl Deduplicater { /// Return deduplicate plan for a given chunk with duplicates /// The plan will look like this /// ```text - /// ┌─────────────────┐ - /// │ DeduplicateExec │ - /// └─────────────────┘ - /// ▲ - /// │ - /// ┌─────────────────┐ - /// │ SortExec │ - /// │ (optional) │ - /// └─────────────────┘ - /// ▲ - /// │ - /// │ - /// ┌─────────────────┐ - /// │IOxReadFilterNode│ - /// │ (Chunk) │ - /// └─────────────────┘ + /// ┌─────────────────┐ + /// │ DeduplicateExec │ + /// └─────────────────┘ + /// ▲ + /// │ + /// ┌─────────────────┐ + /// │ SortExec │ + /// │ (optional) │ + /// └─────────────────┘ + /// ▲ + /// │ + /// │ + /// ┌─────────────────┐ + /// │IOxReadFilterNode│ + /// │ (Chunk) │ + /// └─────────────────┘ ///``` fn build_deduplicate_plan_for_chunk_with_duplicates( table_name: Arc, @@ -537,10 +537,10 @@ impl Deduplicater { /// Return the simplest IOx scan plan of a given chunk which is IOxReadFilterNode /// ```text - /// ┌─────────────────┐ - /// │IOxReadFilterNode│ - /// │ (Chunk) │ - /// └─────────────────┘ + /// ┌─────────────────┐ + /// │IOxReadFilterNode│ + /// │ (Chunk) │ + /// └─────────────────┘ ///``` fn build_plan_for_non_duplicates_chunk( table_name: Arc, @@ -558,10 +558,10 @@ impl Deduplicater { /// Return the simplest IOx scan plan for many chunks which is IOxReadFilterNode /// ```text - /// ┌─────────────────┐ - /// │IOxReadFilterNode│ - /// │ (Chunk) │ - /// └─────────────────┘ + /// ┌─────────────────┐ + /// │IOxReadFilterNode│ + /// │ (Chunk) │ + /// └─────────────────┘ ///``` fn build_plans_for_non_duplicates_chunk( table_name: Arc,