diff --git a/Cargo.lock b/Cargo.lock index 19bc80ef81..200bd6c04b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -885,6 +885,7 @@ name = "data_types" version = "0.1.0" dependencies = [ "chrono", + "datafusion 0.1.0", "influxdb_line_protocol", "observability_deps", "percent-encoding", diff --git a/data_types/Cargo.toml b/data_types/Cargo.toml index bfd1331a30..b6ac0259c6 100644 --- a/data_types/Cargo.toml +++ b/data_types/Cargo.toml @@ -8,6 +8,7 @@ readme = "README.md" [dependencies] # In alphabetical order chrono = { version = "0.4", features = ["serde"] } +datafusion = { path = "../datafusion" } influxdb_line_protocol = { path = "../influxdb_line_protocol" } percent-encoding = "2.1.0" regex = "1.4" diff --git a/data_types/src/partition_metadata.rs b/data_types/src/partition_metadata.rs index 48395bc9a7..91c260b882 100644 --- a/data_types/src/partition_metadata.rs +++ b/data_types/src/partition_metadata.rs @@ -1,6 +1,9 @@ //! This module contains structs that describe the metadata for a partition //! including schema, summary statistics, and file locations in storage. +use datafusion::arrow::compute::SortOptions; +use datafusion::physical_plan::expressions::{col, PhysicalSortExpr}; + use std::{borrow::Cow, cmp::Ordering, mem}; use serde::{Deserialize, Serialize}; @@ -173,6 +176,25 @@ impl TableSummary { key_summaries } + + /// Returns the pk in arrow's expression used for data sorting + pub fn arrow_pk_sort_exprs(key_summaries: Vec<&ColumnSummary>) -> Vec { + //let key_summaries = self.primary_key_columns(); + + // build sort expression + let mut sort_exprs = vec![]; + for key in key_summaries { + sort_exprs.push(PhysicalSortExpr { + expr: col(key.name.as_str()), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }); + } + + sort_exprs + } } // Replicate this enum here as it can't be derived from the existing statistics @@ -202,7 +224,7 @@ pub struct ColumnSummary { /// Column's Influx data model type (if any) pub influxdb_type: Option, - /// Per column statistics + /// Per column pub stats: Statistics, } diff --git a/query/src/provider.rs b/query/src/provider.rs index 5622675025..39d7cd5ba7 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -2,7 +2,8 @@ use std::sync::Arc; -use arrow::{compute::SortOptions, datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError}; +use arrow::{datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError}; +use data_types::partition_metadata::TableSummary; use datafusion::{ datasource::{ datasource::{Statistics, TableProviderFilterPushDown}, @@ -11,9 +12,8 @@ use datafusion::{ error::{DataFusionError, Result as DataFusionResult}, logical_plan::Expr, physical_plan::{ - expressions::{col, PhysicalSortExpr}, - sort::SortExec, - ExecutionPlan, + expressions::PhysicalSortExpr, sort::SortExec, + sort_preserving_merge::SortPreservingMergeExec, union::UnionExec, ExecutionPlan, }, }; use internal_types::schema::{merge::SchemaMerger, Schema}; @@ -373,7 +373,7 @@ impl Deduplicater { Arc::clone(&schema), overlapped_chunks.to_owned(), predicate.clone(), - )); + )?); } // Go over each in_chunk_duplicates_chunks, build deduplicate plan for each @@ -447,6 +447,11 @@ impl Deduplicater { /// ┌───────────────────────┐ /// │SortPreservingMergeExec│ /// └───────────────────────┘ + /// ▲ + /// │ + /// ┌───────────────────────┐ + /// │ UnionExec │ + /// └───────────────────────┘ /// ▲ /// │ /// ┌───────────┴───────────┐ @@ -468,15 +473,38 @@ impl Deduplicater { schema: ArrowSchemaRef, chunks: Vec>, // These chunks are identified overlapped predicate: Predicate, - ) -> Arc { - // TODO - // Currently return just like there are no overlaps, no duplicates - Arc::new(IOxReadFilterNode::new( - Arc::clone(&table_name), - schema, - chunks, - predicate, - )) + ) -> Result> { + // Build sort plan for each chunk + let mut sorted_chunk_plans: Vec> = vec![]; + + for chunk in &chunks { + sorted_chunk_plans.push(Self::build_sort_plan_for_read_filter( + Arc::clone(&table_name), + Arc::clone(&schema), + Arc::clone(&chunk), + predicate.clone(), + )?); + } + + // TODOs: build primary key by accumulating unique key columns from each chunk's table summary + // use the one of the first chunk for now + let key_summaries = chunks[0].summary().primary_key_columns(); + + // Union the plans + // The UnionExec operator only streams all chunks (aka partitions in Datafusion) and + // keep them in separate chunks which exactly what we need here + let plan = UnionExec::new(sorted_chunk_plans); + + // Now (sort) merge the already sorted chunks + let sort_exprs = TableSummary::arrow_pk_sort_exprs(key_summaries); + let plan = Arc::new(SortPreservingMergeExec::new( + sort_exprs.clone(), + Arc::new(plan), + 1024, + )); + + // Add DeduplicateExc + Self::add_deduplicate_node(sort_exprs, Ok(plan)) } /// Return deduplicate plan for a given chunk with duplicates @@ -504,6 +532,54 @@ impl Deduplicater { schema: ArrowSchemaRef, chunk: Arc, // This chunk is identified having duplicates predicate: Predicate, + ) -> Result> { + // Create the 2 bottom nodes IOxReadFilterNode and SortExec + let plan = Self::build_sort_plan_for_read_filter( + table_name, + schema, + Arc::clone(&chunk), + predicate, + ); + + // Add DeduplicateExc + // Sort exprs for the deduplication + let key_summaries = chunk.summary().primary_key_columns(); + let sort_exprs = TableSummary::arrow_pk_sort_exprs(key_summaries); + Self::add_deduplicate_node(sort_exprs, plan) + } + + // Hooks DeduplicateExec on top of the given input plan + fn add_deduplicate_node( + _sort_exprs: Vec, + input: Result>, + ) -> Result> { + // TODOS when DeduplicateExec is build + // Ticket https://github.com/influxdata/influxdb_iox/issues/1646 + + // Currently simply return the input plan + input + } + + /// Return a sort plan for for a given chunk + /// The plan will look like this + /// ```text + /// ┌─────────────────┐ + /// │ SortExec │ + /// │ (optional) │ + /// └─────────────────┘ + /// ▲ + /// │ + /// │ + /// ┌─────────────────┐ + /// │IOxReadFilterNode│ + /// │ (Chunk) │ + /// └─────────────────┘ + ///``` + fn build_sort_plan_for_read_filter( + table_name: Arc, + schema: ArrowSchemaRef, + chunk: Arc, // This chunk is identified having duplicates + predicate: Predicate, ) -> Result> { // Create the bottom node IOxReadFilterNode for this chunk let input: Arc = Arc::new(IOxReadFilterNode::new( @@ -514,14 +590,7 @@ impl Deduplicater { )); // Add the sort operator, SortExec, if needed - //let plan = Self::build_sort_plan(chunk, input); Self::build_sort_plan(chunk, input) - - // Create DeduplicateExc - // TODO: Add DeuplicateExec here when it is implemented in https://github.com/influxdata/influxdb_iox/issues/1646 - //plan = add_deduplicate_exec(plan); - - //plan } /// Add SortExec operator on top of the input plan of the given chunk @@ -534,20 +603,8 @@ impl Deduplicater { return Ok(input); } - // Sort the chunk on pk let key_summaries = chunk.summary().primary_key_columns(); - - // build sort expression - let mut sort_exprs = vec![]; - for key in key_summaries { - sort_exprs.push(PhysicalSortExpr { - expr: col(key.name.as_str()), - options: SortOptions { - descending: false, - nulls_first: false, - }, - }); - } + let sort_exprs = TableSummary::arrow_pk_sort_exprs(key_summaries); // Create SortExec operator Ok(Arc::new( @@ -580,7 +637,7 @@ impl Deduplicater { /// ```text /// ┌─────────────────┐ /// │IOxReadFilterNode│ - /// │ (Chunk) │ + /// │ (Many Chunks) │ /// └─────────────────┘ ///``` fn build_plans_for_non_duplicates_chunk( @@ -761,6 +818,98 @@ mod test { assert_batches_eq!(&expected, &batch); } + #[tokio::test] + async fn sort_read_filter_plan_for_two_tags_with_time() { + // Chunk 1 with 5 rows of data + let chunk = Arc::new( + TestChunk::new(1) + .with_time_column("t") + .with_tag_column("t", "tag1") + .with_tag_column("t", "tag2") + .with_int_field_column("t", "field_int") + .with_five_rows_of_data("t"), + ); + + // Datafusion schema of the chunk + let schema = chunk.table_schema(Selection::All).unwrap().as_arrow(); + + let sort_plan = Deduplicater::build_sort_plan_for_read_filter( + Arc::from("t"), + schema, + Arc::clone(&chunk), + Predicate::default(), + ); + let batch = collect(sort_plan.unwrap()).await.unwrap(); + // data is not sorted on primary key(tag1, tag2, time) + let expected = vec![ + "+-----------+------+------+-------------------------------+", + "| field_int | tag1 | tag2 | time |", + "+-----------+------+------+-------------------------------+", + "| 100 | AL | MA | 1970-01-01 00:00:00.000000050 |", + "| 70 | CT | CT | 1970-01-01 00:00:00.000000100 |", + "| 5 | MT | AL | 1970-01-01 00:00:00.000005 |", + "| 10 | MT | AL | 1970-01-01 00:00:00.000007 |", + "| 1000 | MT | CT | 1970-01-01 00:00:00.000001 |", + "+-----------+------+------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + } + + #[tokio::test] + async fn deduplicate_plan_for_overlapped_chunks() { + // Chunk 1 with 5 rows of data on 2 tags + let chunk1 = Arc::new( + TestChunk::new(1) + .with_time_column("t") + .with_tag_column("t", "tag1") + .with_tag_column("t", "tag2") + .with_int_field_column("t", "field_int") + .with_five_rows_of_data("t"), + ); + + // Chunk 2 exactly the same with Chunk 1 + let chunk2 = Arc::new( + TestChunk::new(1) + .with_time_column("t") + .with_tag_column("t", "tag1") + .with_tag_column("t", "tag2") + .with_int_field_column("t", "field_int") + .with_five_rows_of_data("t"), + ); + + // Datafusion schema of the chunk + // the same for 2 chunks + let schema = chunk1.table_schema(Selection::All).unwrap().as_arrow(); + + let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks( + Arc::from("t"), + schema, + vec![chunk1, chunk2], + Predicate::default(), + ); + let batch = collect(sort_plan.unwrap()).await.unwrap(); + // data is not sorted on primary key(tag1, tag2, time) + + // NOTE: When the full deduplication is done, the duplciates will be removed from this output + let expected = vec![ + "+-----------+------+------+-------------------------------+", + "| field_int | tag1 | tag2 | time |", + "+-----------+------+------+-------------------------------+", + "| 100 | AL | MA | 1970-01-01 00:00:00.000000050 |", + "| 100 | AL | MA | 1970-01-01 00:00:00.000000050 |", + "| 70 | CT | CT | 1970-01-01 00:00:00.000000100 |", + "| 70 | CT | CT | 1970-01-01 00:00:00.000000100 |", + "| 5 | MT | AL | 1970-01-01 00:00:00.000005 |", + "| 5 | MT | AL | 1970-01-01 00:00:00.000005 |", + "| 10 | MT | AL | 1970-01-01 00:00:00.000007 |", + "| 10 | MT | AL | 1970-01-01 00:00:00.000007 |", + "| 1000 | MT | CT | 1970-01-01 00:00:00.000001 |", + "| 1000 | MT | CT | 1970-01-01 00:00:00.000001 |", + "+-----------+------+------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + } + fn chunk_ids(group: &[Arc]) -> String { let ids = group.iter().map(|c| c.id().to_string()).collect::>(); ids.join(", ")