diff --git a/data_types/src/partition_metadata.rs b/data_types/src/partition_metadata.rs index 84bd682f6b..c9d2696abb 100644 --- a/data_types/src/partition_metadata.rs +++ b/data_types/src/partition_metadata.rs @@ -98,6 +98,15 @@ impl TableSummary { } } + /// Returns the primary key of this table + pub fn primary_key(&self) -> Vec { + self.columns + .iter() + .filter(|c| c.key_part()) + .map(|c| c.name.clone()) + .collect() + } + /// Returns the total number of rows in the columns of this summary pub fn count(&self) -> u64 { // Assumes that all tables have the same number of rows, so @@ -186,6 +195,15 @@ impl ColumnSummary { self.stats.type_name() } + /// Return true if this column is a part of the primary key which + /// means it is either a tag or timestamp + pub fn key_part(&self) -> bool { + matches!( + self.influxdb_type, + Some(InfluxDbType::Tag) | Some(InfluxDbType::Timestamp) + ) + } + /// Return size in bytes of this Column metadata (not the underlying column) pub fn size(&self) -> usize { mem::size_of::() + self.name.len() + self.stats.size() diff --git a/query/src/provider.rs b/query/src/provider.rs index e8a020ffa5..1f7bc299e4 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -58,6 +58,11 @@ pub enum Error { InternalPushdownPredicate { source: datafusion::error::DataFusionError, }, + + #[snafu(display("Internal error while looking for overlapped chunks '{}'", source,))] + InternalSplitOvelappedChunks { + source: datafusion::error::DataFusionError, + }, } pub type Result = std::result::Result; @@ -243,10 +248,15 @@ impl TableProvider for ChunkTableProvider { // Figure out the schema of the requested output let scan_schema = project_schema(self.arrow_schema(), projection); - let plan = - IOxReadFilterNode::new(Arc::clone(&self.table_name), scan_schema, chunks, predicate); + let mut deduplicate = Deduplicater::new(); + let plan = deduplicate.build_scan_plan( + Arc::clone(&self.table_name), + scan_schema, + chunks, + predicate, + ); - Ok(Arc::new(plan)) + Ok(plan) } fn statistics(&self) -> Statistics { @@ -262,6 +272,282 @@ impl TableProvider for ChunkTableProvider { } } +#[derive(Clone, Debug, Default)] +/// A deduplicater that deduplicate the duplicated data during scan execution +pub(crate) struct Deduplicater { + // a vector of a vector of overlapped chunks + pub overlapped_chunks_set: Vec>>, + + // a vector of non-overlapped chunks each have duplicates in itself + pub in_chunk_duplicates_chunks: Vec>, + + // a vector of non-overlapped and non-duplicates chunks + pub no_duplicates_chunks: Vec>, +} + +impl Deduplicater { + fn new() -> Self { + Self { + overlapped_chunks_set: vec![], + in_chunk_duplicates_chunks: vec![], + no_duplicates_chunks: vec![], + } + } + + /// The IOx scan process needs to deduplicate data if there are duplicates. Hence it will look + /// like this. In this example, there are 4 chunks. + /// . Chunks 1 and 2 overlap and need to get deduplicated. This includes these main steps: + /// i. Read/scan/steam the chunk: IOxReadFilterNode. + /// ii. Sort each chunk if they are not sorted yet: SortExec. + /// iii. Merge the sorted chunks into one stream: SortPreservingMergeExc. + /// iv. Deduplicate the sorted stream: DeduplicateExec + /// . Chunk 3 does not overlap with others but has duplicates in it self, hence it only needs to get + /// sorted if needed, then deduplicated. + /// . Chunk 4 neither overlaps with other chunks nor has duplicates in itself, hence it does not + /// need any extra besides chunk reading. + /// The final UnionExec on top is to union the streams below. If there is only one stream, UnionExec + /// will not be added into the plan. + /// ```text + /// ┌─────────────────┐ + /// │ UnionExec │ + /// │ │ + /// └─────────────────┘ + /// ▲ + /// │ + /// ┌──────────────────────┴───────────┬─────────────────────┐ + /// │ │ │ + /// │ │ │ + /// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ + /// │ DeduplicateExec │ │ DeduplicateExec │ │IOxReadFilterNode│ + /// └─────────────────┘ └─────────────────┘ │ (Chunk 4) │ + /// ▲ ▲ └─────────────────┘ + /// │ │ + /// ┌───────────────────────┐ │ + /// │SortPreservingMergeExec│ │ + /// └───────────────────────┘ │ + /// ▲ | + /// │ | + /// ┌───────────┴───────────┐ │ + /// │ │ │ + /// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ + /// │ SortExec │ │ SortExec │ │ SortExec │ + /// │ (optional) │ │ (optional) │ │ (optional) │ + /// └─────────────────┘ └─────────────────┘ └─────────────────┘ + /// ▲ ▲ ▲ + /// │ │ │ + /// │ │ │ + /// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ + /// │IOxReadFilterNode│ │IOxReadFilterNode│ │IOxReadFilterNode│ + /// │ (Chunk 1) │ │ (Chunk 2) │ │ (Chunk 3) │ + /// └─────────────────┘ └─────────────────┘ └─────────────────┘ + ///``` + + fn build_scan_plan( + &mut self, + table_name: Arc, + schema: ArrowSchemaRef, + chunks: Vec>, + predicate: Predicate, + ) -> Arc { + //predicate: Predicate,) -> std::result::Result, DataFusionError> { + + //finding overlapped chunks and put them into the right group + self.split_overlapped_chunks(chunks.to_vec()); + + // Building plans + let mut plans = vec![]; + if self.no_duplicates() { + // Neither overlaps nor duplicates, no deduplicating needed + let plan = Self::build_plans_for_non_duplicates_chunk( + Arc::clone(&table_name), + Arc::clone(&schema), + chunks, + predicate, + ); + plans.push(plan); + } else { + // Go over overlapped set, build deduplicate plan for each vector of overlapped chunks + for overlapped_chunks in self.overlapped_chunks_set.to_vec() { + plans.push(Self::build_deduplicate_plan_for_overlapped_chunks( + Arc::clone(&table_name), + Arc::clone(&schema), + overlapped_chunks.to_owned(), + predicate.clone(), + )); + } + + // Go over each in_chunk_duplicates_chunks, build deduplicate plan for each + for chunk_with_duplicates in self.in_chunk_duplicates_chunks.to_vec() { + plans.push(Self::build_deduplicate_plan_for_chunk_with_duplicates( + Arc::clone(&table_name), + Arc::clone(&schema), + chunk_with_duplicates.to_owned(), + predicate.clone(), + )); + } + + // Go over non_duplicates_chunks, build a plan for it + for no_duplicates_chunk in self.no_duplicates_chunks.to_vec() { + plans.push(Self::build_plan_for_non_duplicates_chunk( + Arc::clone(&table_name), + Arc::clone(&schema), + no_duplicates_chunk.to_owned(), + predicate.clone(), + )); + } + } + + let final_plan = plans.remove(0); + + // TODO + // There are still plan, add UnionExec + if !plans.is_empty() { + // final_plan = union_plan + // .... + } + + final_plan + } + + /// discover overlaps and split them into three groups: + /// 1. vector of vector of overlapped chunks + /// 2. vector of non-overlapped chunks, each have duplicates in itself + /// 3. vectors of non-overlapped chunks without duplicates + fn split_overlapped_chunks(&mut self, chunks: Vec>) { + // TODO: need to discover overlaps and split them + // The current behavior is just like neither overlaps nor having duplicates in its own chunk + //self.overlapped_chunks_set = vec![]; + //self.in_chunk_duplicates_chunks = vec![]; + self.no_duplicates_chunks.append(&mut chunks.to_vec()); + } + + /// Return true if all chunks are neither overlap nor has duplicates in itself + fn no_duplicates(&self) -> bool { + self.overlapped_chunks_set.is_empty() && self.in_chunk_duplicates_chunks.is_empty() + } + + /// Return deduplicate plan for the given overlapped chunks + /// The plan will look like this + /// ```text + /// ┌─────────────────┐ + /// │ DeduplicateExec │ + /// └─────────────────┘ + /// ▲ + /// │ + /// ┌───────────────────────┐ + /// │SortPreservingMergeExec│ + /// └───────────────────────┘ + /// ▲ + /// │ + /// ┌───────────┴───────────┐ + /// │ │ + /// ┌─────────────────┐ ┌─────────────────┐ + /// │ SortExec │ ... │ SortExec │ + /// │ (optional) │ │ (optional) │ + /// └─────────────────┘ └─────────────────┘ + /// ▲ ▲ + /// │ ... │ + /// │ │ + /// ┌─────────────────┐ ┌─────────────────┐ + /// │IOxReadFilterNode│ │IOxReadFilterNode│ + /// │ (Chunk 1) │ ... │ (Chunk n) │ + /// └─────────────────┘ └─────────────────┘ + ///``` + fn build_deduplicate_plan_for_overlapped_chunks( + table_name: Arc, + schema: ArrowSchemaRef, + chunks: Vec>, // These chunks are identified overlapped + predicate: Predicate, + ) -> Arc { + // TODO + // Currently return just like there are no overlaps, no duplicates + Arc::new(IOxReadFilterNode::new( + Arc::clone(&table_name), + schema, + chunks, + predicate, + )) + } + + /// Return deduplicate plan for a given chunk with duplicates + /// The plan will look like this + /// ```text + /// ┌─────────────────┐ + /// │ DeduplicateExec │ + /// └─────────────────┘ + /// ▲ + /// │ + /// ┌─────────────────┐ + /// │ SortExec │ + /// │ (optional) │ + /// └─────────────────┘ + /// ▲ + /// │ + /// │ + /// ┌─────────────────┐ + /// │IOxReadFilterNode│ + /// │ (Chunk) │ + /// └─────────────────┘ + ///``` + fn build_deduplicate_plan_for_chunk_with_duplicates( + table_name: Arc, + schema: ArrowSchemaRef, + chunk: Arc, // This chunk is identified having duplicates + predicate: Predicate, + ) -> Arc { + // // TODO + // // Currently return just like there are no overlaps, no duplicates + Arc::new(IOxReadFilterNode::new( + Arc::clone(&table_name), + schema, + vec![chunk], + predicate, + )) + } + + /// Return the simplest IOx scan plan of a given chunk which is IOxReadFilterNode + /// ```text + /// ┌─────────────────┐ + /// │IOxReadFilterNode│ + /// │ (Chunk) │ + /// └─────────────────┘ + ///``` + fn build_plan_for_non_duplicates_chunk( + table_name: Arc, + schema: ArrowSchemaRef, + chunk: Arc, // This chunk is identified having no duplicates + predicate: Predicate, + ) -> Arc { + Arc::new(IOxReadFilterNode::new( + Arc::clone(&table_name), + schema, + vec![chunk], + predicate, + )) + } + + /// Return the simplest IOx scan plan for many chunks which is IOxReadFilterNode + /// ```text + /// ┌─────────────────┐ + /// │IOxReadFilterNode│ + /// │ (Chunk) │ + /// └─────────────────┘ + ///``` + fn build_plans_for_non_duplicates_chunk( + table_name: Arc, + schema: ArrowSchemaRef, + chunks: Vec>, // This chunk is identified having no duplicates + predicate: Predicate, + ) -> Arc { + Arc::new(IOxReadFilterNode::new( + Arc::clone(&table_name), + schema, + chunks, + predicate, + )) + } +} + #[derive(Debug)] /// A pruner that does not do pruning (suitable if no additional pruning is possible) struct NoOpPruner {} diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 4f43e53932..18883a1c6c 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -49,6 +49,12 @@ pub struct ChunkMetadata { pub schema: Arc, } +impl ChunkMetadata { + pub fn primary_key(&self) -> Vec { + self.table_summary.primary_key() + } +} + /// Different memory representations of a frozen chunk. #[derive(Debug)] pub enum ChunkStageFrozenRepr { diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index bcb571196b..8cee1cc333 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -165,6 +165,10 @@ impl DbChunk { }) } + pub fn primary_key(&self) -> Vec { + self.meta.primary_key() + } + /// Return the snapshot of the chunk with type ParquetFile /// This function should be only invoked when you know your chunk /// is ParquetFile type whose state is WrittenToObjectStore. The