From 40cb4f741f8b7a7465ee2cbc8d5c079e4e0db71f Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Tue, 8 Jun 2021 13:17:36 -0400 Subject: [PATCH 1/8] feat: initial implementaton --- data_types/src/partition_metadata.rs | 6 ++ query/src/lib.rs | 6 ++ query/src/provider.rs | 154 ++++++++++++++++++++++++++- server/src/db/chunk.rs | 15 +++ 4 files changed, 179 insertions(+), 2 deletions(-) diff --git a/data_types/src/partition_metadata.rs b/data_types/src/partition_metadata.rs index 84bd682f6b..1de80d4a4f 100644 --- a/data_types/src/partition_metadata.rs +++ b/data_types/src/partition_metadata.rs @@ -218,6 +218,12 @@ impl ColumnSummary { (Statistics::String(_), _) => unreachable!(), } } + + /// Returns true if this column is a part of the primary key which + /// is either Tag or Timestamp + pub fn is_key_part(&self) -> bool { + matches!(self.influxdb_type, Some(InfluxDbType::Tag) | Some(InfluxDbType::Timestamp)) + } } /// Column name, statistics which encode type information diff --git a/query/src/lib.rs b/query/src/lib.rs index bc6f702452..2d119f1a6b 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -117,6 +117,12 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync { predicate: &Predicate, selection: Selection<'_>, ) -> Result; + + /// Returns true if this chunk has duplicates + fn has_duplicates(&self) -> bool; + + /// Returns true if data of this chunk is sorted + fn is_sorted(&self) -> bool; } #[async_trait] diff --git a/query/src/provider.rs b/query/src/provider.rs index aa3978827d..d859bf8818 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use arrow::datatypes::SchemaRef as ArrowSchemaRef; +// use data_types::partition_metadata::ColumnSummary; use datafusion::{ datasource::{ datasource::{Statistics, TableProviderFilterPushDown}, @@ -490,14 +491,163 @@ impl Deduplicater { ) -> Arc { // // TODO // // Currently return just like there are no overlaps, no duplicates - Arc::new(IOxReadFilterNode::new( + + // Create the bottom node IOxRedFilterNode for this chunk + let input = Arc::new(IOxReadFilterNode::new( Arc::clone(&table_name), schema, vec![chunk], predicate, - )) + )); + + + // Add the sort operator, SortExec, if needed + if !chunk.is_sorted() { + // Create SortExec plan + let input_schema = input.as_ref().schema(); + //let key_summaries: Vec; // Todo: waiting fro alamb to merge + + } + + // Create DeduplicateExc + + // + input } + + ////////// + + // let sort_expr = expr + // .iter() + // .map(|e| match e { + // Expr::Sort { + // expr, + // asc, + // nulls_first, + // } => self.create_physical_sort_expr( + // expr, + // &input_schema, + // SortOptions { + // descending: !*asc, + // nulls_first: *nulls_first, + // }, + // ctx_state, + // ), + // _ => Err(DataFusionError::Plan( + // "Sort only accepts sort expressions".to_string(), + // )), + // }) + // .collect::>>()?; + + // Ok(Arc::new(SortExec::try_new(sort_expr, input)?)) + + /////////// + /////// TEST: create SortExec plan + // #[tokio::test] + // async fn test_lex_sort_by_float() -> Result<()> { + // let schema = Arc::new(Schema::new(vec![ + // Field::new("a", DataType::Float32, true), + // Field::new("b", DataType::Float64, true), + // ])); + + // // define data. + // let batch = RecordBatch::try_new( + // schema.clone(), + // vec![ + // Arc::new(Float32Array::from(vec![ + // Some(f32::NAN), + // None, + // None, + // Some(f32::NAN), + // Some(1.0_f32), + // Some(1.0_f32), + // Some(2.0_f32), + // Some(3.0_f32), + // ])), + // Arc::new(Float64Array::from(vec![ + // Some(200.0_f64), + // Some(20.0_f64), + // Some(10.0_f64), + // Some(100.0_f64), + // Some(f64::NAN), + // None, + // None, + // Some(f64::NAN), + // ])), + // ], + // )?; + + // let sort_exec = Arc::new(SortExec::try_new( + // vec![ + // PhysicalSortExpr { + // expr: col("a"), + // options: SortOptions { + // descending: true, + // nulls_first: true, + // }, + // }, + // PhysicalSortExpr { + // expr: col("b"), + // options: SortOptions { + // descending: false, + // nulls_first: false, + // }, + // }, + // ], + // Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None)?), + // )?); + + // assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type()); + // assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type()); + + // let result: Vec = collect(sort_exec.clone()).await?; + // assert!(sort_exec.metrics().get("sortTime").unwrap().value() > 0); + // assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value(), 8); + // assert_eq!(result.len(), 1); + + // let columns = result[0].columns(); + + // assert_eq!(DataType::Float32, *columns[0].data_type()); + // assert_eq!(DataType::Float64, *columns[1].data_type()); + + // let a = as_primitive_array::(&columns[0]); + // let b = as_primitive_array::(&columns[1]); + + // // convert result to strings to allow comparing to expected result containing NaN + // let result: Vec<(Option, Option)> = (0..result[0].num_rows()) + // .map(|i| { + // let aval = if a.is_valid(i) { + // Some(a.value(i).to_string()) + // } else { + // None + // }; + // let bval = if b.is_valid(i) { + // Some(b.value(i).to_string()) + // } else { + // None + // }; + // (aval, bval) + // }) + // .collect(); + + // let expected: Vec<(Option, Option)> = vec![ + // (None, Some("10".to_owned())), + // (None, Some("20".to_owned())), + // (Some("NaN".to_owned()), Some("100".to_owned())), + // (Some("NaN".to_owned()), Some("200".to_owned())), + // (Some("3".to_owned()), Some("NaN".to_owned())), + // (Some("2".to_owned()), None), + // (Some("1".to_owned()), Some("NaN".to_owned())), + // (Some("1".to_owned()), None), + // ]; + + // assert_eq!(expected, result); + + // Ok(()) + // } + ///////////// + /// Return the simplest IOx scan plan of a given chunk which is IOxReadFilterNode /// ```text /// ┌─────────────────┐ diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index bcb571196b..4084bd8a5e 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -442,6 +442,21 @@ impl PartitionChunk for DbChunk { } } } + + fn has_duplicates(&self) -> bool { + match &self.state { + State::MutableBuffer { .. } => true, + State::ReadBuffer { .. } => true, // TODO: should be false after compaction + State::ParquetFile { .. } => true, // TODO: should be false after compaction + } + + // TODOs: return the right value. For now the chunk is assumed to be not sorted + fn is_sorted(&self) -> bool { + match &self.state { + State::MutableBuffer { .. } => false, + State::ReadBuffer { .. } => false, + State::ParquetFile { .. } => false, + } } impl Prunable for DbChunk { From 68e3a2121f739ecda544d7fa6a324f66351ee875 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Tue, 8 Jun 2021 15:04:31 -0400 Subject: [PATCH 2/8] feat: add SortExec --- data_types/src/partition_metadata.rs | 6 --- query/src/lib.rs | 3 +- query/src/provider.rs | 56 ++++++++++++++++++++-------- query/src/test.rs | 13 ++++++- server/src/db/chunk.rs | 7 +++- 5 files changed, 60 insertions(+), 25 deletions(-) diff --git a/data_types/src/partition_metadata.rs b/data_types/src/partition_metadata.rs index 63e197e2fe..48395bc9a7 100644 --- a/data_types/src/partition_metadata.rs +++ b/data_types/src/partition_metadata.rs @@ -250,12 +250,6 @@ impl ColumnSummary { (Statistics::String(_), _) => unreachable!(), } } - - /// Returns true if this column is a part of the primary key which - /// is either Tag or Timestamp - pub fn is_key_part(&self) -> bool { - matches!(self.influxdb_type, Some(InfluxDbType::Tag) | Some(InfluxDbType::Timestamp)) - } } /// Column name, statistics which encode type information diff --git a/query/src/lib.rs b/query/src/lib.rs index 8b82b1b945..d6675f55af 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -7,7 +7,7 @@ )] use async_trait::async_trait; -use data_types::chunk_metadata::ChunkSummary; +use data_types::{chunk_metadata::ChunkSummary, partition_metadata::ColumnSummary}; use datafusion::physical_plan::SendableRecordBatchStream; use exec::{stringset::StringSet, Executor}; use internal_types::{schema::Schema, selection::Selection}; @@ -124,6 +124,7 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync { /// Returns true if data of this chunk is sorted fn is_sorted(&self) -> bool; + fn primary_key_columns(&self) -> Vec<&ColumnSummary>; } #[async_trait] diff --git a/query/src/provider.rs b/query/src/provider.rs index d859bf8818..1155e63b0f 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -2,17 +2,12 @@ use std::sync::Arc; -use arrow::datatypes::SchemaRef as ArrowSchemaRef; +use arrow::{compute::SortOptions, datatypes::SchemaRef as ArrowSchemaRef}; // use data_types::partition_metadata::ColumnSummary; -use datafusion::{ - datasource::{ +use datafusion::{datasource::{ datasource::{Statistics, TableProviderFilterPushDown}, TableProvider, - }, - error::{DataFusionError, Result as DataFusionResult}, - logical_plan::Expr, - physical_plan::ExecutionPlan, -}; + }, error::{DataFusionError, Result as DataFusionResult}, logical_plan::Expr, physical_plan::{ExecutionPlan, expressions::{PhysicalSortExpr, col}, sort::SortExec}}; use internal_types::schema::{builder::SchemaMerger, Schema}; use observability_deps::tracing::debug; @@ -59,6 +54,11 @@ pub enum Error { InternalPushdownPredicate { source: datafusion::error::DataFusionError, }, + + #[snafu(display("Internal error adding sort operator '{}'", source,))] + InternalSort { + source: datafusion::error::DataFusionError, + }, } pub type Result = std::result::Result; @@ -489,29 +489,47 @@ impl Deduplicater { chunk: Arc, // This chunk is identified having duplicates predicate: Predicate, ) -> Arc { - // // TODO - // // Currently return just like there are no overlaps, no duplicates // Create the bottom node IOxRedFilterNode for this chunk - let input = Arc::new(IOxReadFilterNode::new( + let mut input: Arc = Arc::new(IOxReadFilterNode::new( Arc::clone(&table_name), schema, - vec![chunk], + vec![Arc::clone(&chunk)], predicate, )); // Add the sort operator, SortExec, if needed if !chunk.is_sorted() { - // Create SortExec plan - let input_schema = input.as_ref().schema(); - //let key_summaries: Vec; // Todo: waiting fro alamb to merge + let key_summaries = chunk.primary_key_columns(); + + // build sort expression + let mut sort_exprs = vec![]; + for key in key_summaries { + sort_exprs.push( + PhysicalSortExpr{ + expr: col(key.name.as_str()), + options: SortOptions { + descending: false, + nulls_first: false, + } + } + ); + } + + // Create SortExec operator on top of the IOxReadFilterNode + let sort_exec = SortExec::try_new(sort_exprs, input); + let sort_exec = match sort_exec { + Ok(plan) => plan, + Err(e) => panic!("Internal error while adding SortExec: {}", e) // This should never happens + }; + input = Arc::new(sort_exec); } // Create DeduplicateExc + // TODO: Add DeuplicateExec here when it is implemented in https://github.com/influxdata/influxdb_iox/issues/1646 - // input } @@ -699,3 +717,9 @@ impl ChunkPruner for NoOpPruner { chunks } } + + +#[tokio::test] +async fn test_sort() { + +} \ No newline at end of file diff --git a/query/src/test.rs b/query/src/test.rs index 5348368bc5..1f55770dd8 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -8,7 +8,7 @@ use arrow::{ datatypes::{DataType, Int32Type, TimeUnit}, record_batch::RecordBatch, }; -use data_types::{chunk_metadata::ChunkSummary, partition_metadata::TableSummary}; +use data_types::{chunk_metadata::ChunkSummary, partition_metadata::{ColumnSummary, TableSummary}}; use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream}; use crate::{ @@ -334,6 +334,17 @@ impl PartitionChunk for TestChunk { let stream = SizedRecordBatchStream::new(batches[0].schema(), batches); Ok(Box::pin(stream)) } + fn has_duplicates(&self) -> bool { + false + } + + /// Returns true if data of this chunk is sorted + fn is_sorted(&self) -> bool { + false + } + fn primary_key_columns(&self) -> Vec<&ColumnSummary> { + vec![] + } fn apply_predicate(&self, predicate: &Predicate) -> Result { self.check_error()?; diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 4084bd8a5e..d55e5c9944 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -4,7 +4,7 @@ use std::{ }; use arrow::datatypes::SchemaRef; -use data_types::partition_metadata; +use data_types::partition_metadata::{self, ColumnSummary}; use partition_metadata::TableSummary; use snafu::{ResultExt, Snafu}; @@ -448,6 +448,7 @@ impl PartitionChunk for DbChunk { State::MutableBuffer { .. } => true, State::ReadBuffer { .. } => true, // TODO: should be false after compaction State::ParquetFile { .. } => true, // TODO: should be false after compaction + } } // TODOs: return the right value. For now the chunk is assumed to be not sorted @@ -456,6 +457,10 @@ impl PartitionChunk for DbChunk { State::MutableBuffer { .. } => false, State::ReadBuffer { .. } => false, State::ParquetFile { .. } => false, + } + } + fn primary_key_columns(&self) -> Vec<&ColumnSummary> { + self.meta.table_summary.primary_key_columns() } } From ab7d3384b7b15b50a484a79b155a4825c0d41794 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Tue, 8 Jun 2021 21:43:02 -0400 Subject: [PATCH 3/8] refactor: remove unused comments --- query/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/query/src/lib.rs b/query/src/lib.rs index f6ce6901f5..7e9d413883 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -125,7 +125,6 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync { /// Returns true if data of this chunk is sorted fn is_sorted(&self) -> bool; - //fn primary_key_columns(&self) -> Vec<&ColumnSummary>; } #[async_trait] From 3d50ff7a604485b562c296393d512d900dbc8401 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Tue, 8 Jun 2021 21:48:57 -0400 Subject: [PATCH 4/8] refactor: remove comments --- query/src/provider.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/query/src/provider.rs b/query/src/provider.rs index 817ec2f329..058ab895e2 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use arrow::{compute::SortOptions, datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError}; -// use data_types::partition_metadata::ColumnSummary; use datafusion::{ datasource::{ datasource::{Statistics, TableProviderFilterPushDown}, From 89fcc457f42a49fa09b1e1a6ba70ba9ee0467c6a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 9 Jun 2021 13:46:39 -0400 Subject: [PATCH 5/8] fix: Fix bug in chunk overlap calculation due to nulls (#1669) * fix: Fix bug in chunk overlap calculation due to nulls * docs: add note about algorithmic complexity * fix: avoid recursion in normal case --- query/src/duplicate.rs | 141 +++++++++++++++++++++++++++++++++-------- 1 file changed, 114 insertions(+), 27 deletions(-) diff --git a/query/src/duplicate.rs b/query/src/duplicate.rs index 39687d55c3..6810df0101 100644 --- a/query/src/duplicate.rs +++ b/query/src/duplicate.rs @@ -138,30 +138,95 @@ where } } - /// Returns true if the chunk has a potential primary key overlap with the other chunk + /// Returns true if the chunk has a potential primary key overlap + /// with the other chunk. + /// + /// This this algorithm is O(2^N) in the worst case. However, the + /// pathological case is where two chunks each have a large + /// numbers of tags that have no overlap, which seems unlikely in + /// the real world. + /// + /// Note this algoritm is quite conservative (in that it will + /// assume that any column can contain nulls) and thus can match + /// with chunks that do not have that column. for example + /// + /// Chunk 1: tag_a + /// Chunk 2: tag_a, tag_b + /// + /// In this case Chunk 2 has values for tag_b but Chunk 1 + /// doesn't have any values in tag_b (its values are implicitly + /// null) + /// + /// If Chunk 2 has any null values in the tag_b column, it could + /// overlap with Chunk 1 (as logically there can be rows with + /// (tag_a = NULL, tag_b = NULL) in both chunks + /// + /// We could make this algorithm significantly less conservative + /// if we stored the Null count in the ColumnSummary (and thus + /// could rule out matches with columns that were not present) if + /// there were no NULLs fn potential_overlap(&self, other: &Self) -> Result { - // in order to have overlap, *all* the columns in the sort order - // need to be the same. Note gaps in the sort order mean they - // are for different parts of the keyspace - if self.key_summaries.len() != other.key_summaries.len() { - // Short circuit on different lengths - return Ok(false); - } + // This algorithm assumes that the keys are sorted by name (so + // they can't appear in different orders on the two sides) + debug_assert!(self + .key_summaries + .windows(2) + .all(|s| s[0].name <= s[1].name)); + debug_assert!(other + .key_summaries + .windows(2) + .all(|s| s[0].name <= s[1].name)); + self.potential_overlap_impl(0, other, 0) + } - let iter = self.key_summaries.iter().zip(other.key_summaries.iter()); - for (s1, s2) in iter { - if s1.name != s2.name || !Self::columns_might_overlap(s1, s2)? { - return Ok(false); + // Checks the remainder of self.columns[self_idx..] and + // other.columns[..other_idx] if they are compatible + fn potential_overlap_impl( + &self, + mut self_idx: usize, + other: &Self, + mut other_idx: usize, + ) -> Result { + loop { + let s1 = self.key_summaries.get(self_idx); + let s2 = other.key_summaries.get(other_idx); + + if let (Some(s1), Some(s2)) = (s1, s2) { + if s1.name == s2.name { + // pk matched in this position, so check values. If we + // find no overlap, know this is false, otherwise need to keep checking + if Self::columns_might_overlap(s1, s2)? { + self_idx += 1; + other_idx += 1; + } else { + return Ok(false); + } + } else { + // name didn't match, so try and find the next + // place it does. Since there may be missing keys + // in each side, need to check each in turn + // + // Note this will result in O(num_tags) stack + // frames in the worst case, but we expect the + // number of tags to be relatively small (~20 at + // the time of this writing) + return Ok(self.potential_overlap_impl(self_idx + 1, other, other_idx)? + || self.potential_overlap_impl(self_idx, other, other_idx + 1)?); + } + } else { + // ran out of columns to check on one side, assume the + // other could have nulls all the way down (due to null + // assumption) + return Ok(true); } } - - Ok(true) } /// Returns true if the two columns MAY overlap other, based on /// statistics pub fn columns_might_overlap(s1: &ColumnSummary, s2: &ColumnSummary) -> Result { use Statistics::*; + let overlap = match (&s1.stats, &s2.stats) { (I64(s1), I64(s2)) => s1.overlaps(s2), (U64(s1), U64(s2)) => s1.overlaps(s2), @@ -176,14 +241,15 @@ where } }; - // If either column has no min/max, treat the column as - // being entirely null + // If either column has no min/max, treat the column as being + // entirely null, meaning that it could overlap the other + // stats if it had nulls. let is_none = s1.stats.is_none() || s2.stats.is_none(); match overlap { StatOverlap::NonZero => Ok(true), StatOverlap::Zero => Ok(false), - StatOverlap::Unknown if is_none => Ok(false), // no stats means no values + StatOverlap::Unknown if is_none => Ok(true), // This case means there some stats, but not all. // Unclear how this could happen, so throw an error for now StatOverlap::Unknown => InternalPartialStatistics { @@ -313,7 +379,27 @@ mod test { let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); - let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"]; + // the overlap could come when (tag1 = NULL, tag2=NULL) which + // could exist in either chunk + let expected = vec!["Group 0: [chunk1, chunk2]"]; + assert_groups_eq!(expected, groups); + } + + #[test] + fn different_tag_names_multi_tags() { + // check that if chunks overlap but in different tag names + let c1 = TestChunk::new("chunk1") + .with_tag("tag1", Some("aaa"), Some("bbb")) + .with_tag("tag2", Some("aaa"), Some("bbb")); + + let c2 = TestChunk::new("chunk2") + .with_tag("tag2", Some("aaa"), Some("bbb")) + .with_tag("tag3", Some("aaa"), Some("bbb")); + + let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); + + // the overlap could come when (tag1 = NULL, tag2, tag3=NULL) + let expected = vec!["Group 0: [chunk1, chunk2]"]; assert_groups_eq!(expected, groups); } @@ -370,12 +456,13 @@ mod test { let c2 = TestChunk::new("chunk2") // tag1 and timestamp overlap, but no tag2 (aka it is all null) + // so it could overlap if there was a null tag2 value in chunk1 .with_tag("tag1", Some("aaa"), Some("bbb")) .with_timestamp(500, 1000); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); - let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"]; + let expected = vec!["Group 0: [chunk1, chunk2]"]; assert_groups_eq!(expected, groups); } @@ -387,15 +474,15 @@ mod test { .with_timestamp(0, 1000); let c2 = TestChunk::new("chunk2") - // tag1 and timestamp overlap, tag2 has no stats (null) - // so we say they can't overlap + // tag1 and timestamp overlap, tag2 has no stats (is all null) + // so they might overlap if chunk1 had a null in tag 2 .with_tag("tag1", Some("aaa"), Some("bbb")) .with_tag("tag2", None, None) .with_timestamp(500, 1000); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); - let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"]; + let expected = vec!["Group 0: [chunk1, chunk2]"]; assert_groups_eq!(expected, groups); } @@ -445,7 +532,7 @@ mod test { #[test] fn mismatched_types() { - // Test if same column has different types in different + // When the same column has different types in different // chunks; this will likely cause errors elsewhere in practice // as the schemas are incompatible (and can't be merged) let c1 = TestChunk::new("chunk1") @@ -454,14 +541,14 @@ mod test { let c2 = TestChunk::new("chunk2") // tag1 column is actually a field is different in chunk - // 2, so even though the timestamps overlap these chunks - // don't have duplicates + // 2, so since the timestamps overlap these chunks + // might also have duplicates (if tag1 was null in c1) .with_int_field("tag1", Some(100), Some(200)) .with_timestamp(0, 1000); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); - let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"]; + let expected = vec!["Group 0: [chunk1, chunk2]"]; assert_groups_eq!(expected, groups); } @@ -477,7 +564,7 @@ mod test { } /// Mocked out prunable provider to use testing overlaps - #[derive(Debug)] + #[derive(Debug, Clone)] struct TestChunk { // The name of this chunk name: String, From c1c58018fc653280b81b69e83c8ee5763daa6f32 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Wed, 9 Jun 2021 14:17:47 -0400 Subject: [PATCH 6/8] refactor: address review comments --- query/src/lib.rs | 2 +- query/src/provider.rs | 141 ++++++++++++++++++++++++++--------------- query/src/test.rs | 34 ++++++++-- server/src/db/chunk.rs | 2 +- 4 files changed, 118 insertions(+), 61 deletions(-) diff --git a/query/src/lib.rs b/query/src/lib.rs index 7e9d413883..316d85c5db 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -124,7 +124,7 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync { ) -> Result; /// Returns true if data of this chunk is sorted - fn is_sorted(&self) -> bool; + fn is_sorted_on_pk(&self) -> bool; } #[async_trait] diff --git a/query/src/provider.rs b/query/src/provider.rs index 058ab895e2..305218d367 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -534,7 +534,7 @@ impl Deduplicater { chunk: Arc, // This chunk is identified having duplicates predicate: Predicate, ) -> Result> { - // Create the bottom node IOxRedFilterNode for this chunk + // Create the bottom node IOxReadFilterNode for this chunk let input: Arc = Arc::new(IOxReadFilterNode::new( Arc::clone(&table_name), schema, @@ -559,28 +559,29 @@ impl Deduplicater { chunk: Arc, input: Arc, ) -> Result> { - if !chunk.is_sorted() { - let key_summaries = chunk.summary().primary_key_columns(); - - // build sort expression - let mut sort_exprs = vec![]; - for key in key_summaries { - sort_exprs.push(PhysicalSortExpr { - expr: col(key.name.as_str()), - options: SortOptions { - descending: false, - nulls_first: false, - }, - }); - } - - // Create SortExec operator - Ok(Arc::new( - SortExec::try_new(sort_exprs, input).context(InternalSort)?, - )) - } else { - Ok(input) + if chunk.is_sorted_on_pk() { + return Ok(input); } + + // Sort the chunk on pk + let key_summaries = chunk.summary().primary_key_columns(); + + // build sort expression + let mut sort_exprs = vec![]; + for key in key_summaries { + sort_exprs.push(PhysicalSortExpr { + expr: col(key.name.as_str()), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }); + } + + // Create SortExec operator + Ok(Arc::new( + SortExec::try_new(sort_exprs, input).context(InternalSort)?, + )) } /// Return the simplest IOx scan plan of a given chunk which is IOxReadFilterNode @@ -637,10 +638,9 @@ impl ChunkPruner for NoOpPruner { #[cfg(test)] mod test { - use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use arrow_util::assert_batches_eq; use datafusion::physical_plan::collect; - use internal_types::schema::TIME_COLUMN_NAME; + use internal_types::selection::Selection; use crate::test::TestChunk; @@ -682,7 +682,61 @@ mod test { } #[tokio::test] - async fn sort_planning() { + async fn sort_planning_one_tag_with_time() { + // Chunk 1 with 5 rows of data + let chunk = Arc::new( + TestChunk::new(1) + .with_time_column("t") + .with_int_field_column("t", "field_int") + .with_tag_column("t", "tag1") + .with_five_rows_of_data("t"), + ); + + // Datafusion schema of the chunk + let schema = chunk.table_schema(Selection::All).unwrap().as_arrow(); + + // IOx scan operator + let input: Arc = Arc::new(IOxReadFilterNode::new( + Arc::from("t"), + schema, + vec![Arc::clone(&chunk)], + Predicate::default(), + )); + let batch = collect(Arc::clone(&input)).await.unwrap(); + // data in its original non-sorted form + let expected = vec![ + "+------+-----------+-------------------------------+", + "| tag1 | field_int | time |", + "+------+-----------+-------------------------------+", + "| MT | 1000 | 1970-01-01 00:00:00.000001 |", + "| MT | 10 | 1970-01-01 00:00:00.000007 |", + "| CT | 70 | 1970-01-01 00:00:00.000000100 |", + "| AL | 100 | 1970-01-01 00:00:00.000000050 |", + "| MT | 5 | 1970-01-01 00:00:00.000005 |", + "+------+-----------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + + // Add Sort operator on top of IOx scan + let sort_plan = Deduplicater::build_sort_plan(chunk, input); + let batch = collect(sort_plan.unwrap()).await.unwrap(); + // data is not sorted on primary key(tag1, tag2, time) + let expected = vec![ + "+------+-----------+-------------------------------+", + "| tag1 | field_int | time |", + "+------+-----------+-------------------------------+", + "| AL | 100 | 1970-01-01 00:00:00.000000050 |", + "| CT | 70 | 1970-01-01 00:00:00.000000100 |", + "| MT | 1000 | 1970-01-01 00:00:00.000001 |", + "| MT | 5 | 1970-01-01 00:00:00.000005 |", + "| MT | 10 | 1970-01-01 00:00:00.000007 |", + "+------+-----------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + } + + #[tokio::test] + async fn sort_planning_two_tags_with_time() { // Chunk 1 with 5 rows of data let chunk = Arc::new( TestChunk::new(1) @@ -690,28 +744,11 @@ mod test { .with_int_field_column("t", "field_int") .with_tag_column("t", "tag2") .with_tag_column("t", "tag1") - .with_five_row_of_null_data("t"), + .with_five_rows_of_data("t"), ); // Datafusion schema of the chunk - let schema = Arc::new(Schema::new(vec![ - Field::new( - "tag1", - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), - true, - ), - Field::new( - "tag2", - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), - true, - ), - Field::new("field_int", DataType::Int64, true), - Field::new( - TIME_COLUMN_NAME, - DataType::Timestamp(TimeUnit::Nanosecond, None), - true, - ), - ])); + let schema = chunk.table_schema(Selection::All).unwrap().as_arrow(); // IOx scan operator let input: Arc = Arc::new(IOxReadFilterNode::new( @@ -726,11 +763,11 @@ mod test { "+------+------+-----------+-------------------------------+", "| tag1 | tag2 | field_int | time |", "+------+------+-----------+-------------------------------+", - "| MA | MA | 1000 | 1970-01-01 00:00:00.000001 |", - "| MT | MT | 10 | 1970-01-01 00:00:00.000007 |", + "| MT | CT | 1000 | 1970-01-01 00:00:00.000001 |", + "| MT | AL | 10 | 1970-01-01 00:00:00.000007 |", "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |", - "| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |", - "| MT | MT | 5 | 1970-01-01 00:00:00.000005 |", + "| AL | MA | 100 | 1970-01-01 00:00:00.000000050 |", + "| MT | AL | 5 | 1970-01-01 00:00:00.000005 |", "+------+------+-----------+-------------------------------+", ]; assert_batches_eq!(&expected, &batch); @@ -743,11 +780,11 @@ mod test { "+------+------+-----------+-------------------------------+", "| tag1 | tag2 | field_int | time |", "+------+------+-----------+-------------------------------+", - "| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |", + "| AL | MA | 100 | 1970-01-01 00:00:00.000000050 |", "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |", - "| MA | MA | 1000 | 1970-01-01 00:00:00.000001 |", - "| MT | MT | 5 | 1970-01-01 00:00:00.000005 |", - "| MT | MT | 10 | 1970-01-01 00:00:00.000007 |", + "| MT | AL | 5 | 1970-01-01 00:00:00.000005 |", + "| MT | AL | 10 | 1970-01-01 00:00:00.000007 |", + "| MT | CT | 1000 | 1970-01-01 00:00:00.000001 |", "+------+------+-----------+-------------------------------+", ]; assert_batches_eq!(&expected, &batch); diff --git a/query/src/test.rs b/query/src/test.rs index c40a519a3f..3ca14246fe 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -380,7 +380,7 @@ impl TestChunk { /// "| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |", /// "| MT | MT | 5 | 1970-01-01 00:00:00.000005 |", /// "+------+------+-----------+-------------------------------+", - pub fn with_five_row_of_null_data(mut self, _table_name: impl Into) -> Self { + pub fn with_five_rows_of_data(mut self, _table_name: impl Into) -> Self { //let table_name = table_name.into(); let schema = self .table_schema @@ -395,7 +395,14 @@ impl TestChunk { Arc::new(Int64Array::from(vec![1000, 10, 70, 100, 5])) as ArrayRef } DataType::Utf8 => { - Arc::new(StringArray::from(vec!["MA", "MT", "CT", "AL", "MT"])) as ArrayRef + match field.name().as_str() { + "tag1" => Arc::new(StringArray::from(vec!["MT", "MT", "CT", "AL", "MT"])) + as ArrayRef, + "tag2" => Arc::new(StringArray::from(vec!["CT", "AL", "CT", "MA", "AL"])) + as ArrayRef, + _ => Arc::new(StringArray::from(vec!["CT", "MT", "AL", "AL", "MT"])) + as ArrayRef, + } } DataType::Timestamp(TimeUnit::Nanosecond, _) => Arc::new( TimestampNanosecondArray::from_vec(vec![1000, 7000, 100, 50, 5000], None), @@ -403,9 +410,23 @@ impl TestChunk { DataType::Dictionary(key, value) if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 => { - let dict: DictionaryArray = - vec!["MA", "MT", "CT", "AL", "MT"].into_iter().collect(); - Arc::new(dict) as ArrayRef + match field.name().as_str() { + "tag1" => Arc::new( + vec!["MT", "MT", "CT", "AL", "MT"] + .into_iter() + .collect::>(), + ) as ArrayRef, + "tag2" => Arc::new( + vec!["CT", "AL", "CT", "MA", "AL"] + .into_iter() + .collect::>(), + ) as ArrayRef, + _ => Arc::new( + vec!["CT", "MT", "AL", "AL", "MT"] + .into_iter() + .collect::>(), + ) as ArrayRef, + } } _ => unimplemented!( "Unimplemented data type for test database: {:?}", @@ -415,7 +436,6 @@ impl TestChunk { .collect::>(); let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch"); - println!("TestChunk batch data: {:#?}", batch); self.table_data.push(Arc::new(batch)); self @@ -478,7 +498,7 @@ impl PartitionChunk for TestChunk { } /// Returns true if data of this chunk is sorted - fn is_sorted(&self) -> bool { + fn is_sorted_on_pk(&self) -> bool { false } diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 6188061d41..014af96ff8 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -451,7 +451,7 @@ impl PartitionChunk for DbChunk { } // TODOs: return the right value. For now the chunk is assumed to be not sorted - fn is_sorted(&self) -> bool { + fn is_sorted_on_pk(&self) -> bool { match &self.state { State::MutableBuffer { .. } => false, State::ReadBuffer { .. } => false, From b3c94b9d6580959de01fc6a82f666d7de5dfb36f Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Wed, 9 Jun 2021 14:40:10 -0400 Subject: [PATCH 7/8] refactor: change order of fields to pass circle CI tests --- query/src/provider.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/query/src/provider.rs b/query/src/provider.rs index 305218d367..a091ca6f47 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -687,8 +687,8 @@ mod test { let chunk = Arc::new( TestChunk::new(1) .with_time_column("t") - .with_int_field_column("t", "field_int") .with_tag_column("t", "tag1") + .with_int_field_column("t", "field_int") .with_five_rows_of_data("t"), ); @@ -740,10 +740,10 @@ mod test { // Chunk 1 with 5 rows of data let chunk = Arc::new( TestChunk::new(1) - .with_time_column("t") - .with_int_field_column("t", "field_int") - .with_tag_column("t", "tag2") .with_tag_column("t", "tag1") + .with_tag_column("t", "tag2") + .with_int_field_column("t", "field_int") + .with_time_column("t") .with_five_rows_of_data("t"), ); From 4478d900ee723e8fac25c22756d14cbdea40a5a7 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Wed, 9 Jun 2021 15:09:13 -0400 Subject: [PATCH 8/8] refactor: capture test output --- query/src/provider.rs | 74 +++++++++++++++++++++---------------------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/query/src/provider.rs b/query/src/provider.rs index e3a209e41e..5622675025 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -676,15 +676,15 @@ mod test { let batch = collect(Arc::clone(&input)).await.unwrap(); // data in its original non-sorted form let expected = vec![ - "+------+-----------+-------------------------------+", - "| tag1 | field_int | time |", - "+------+-----------+-------------------------------+", - "| MT | 1000 | 1970-01-01 00:00:00.000001 |", - "| MT | 10 | 1970-01-01 00:00:00.000007 |", - "| CT | 70 | 1970-01-01 00:00:00.000000100 |", - "| AL | 100 | 1970-01-01 00:00:00.000000050 |", - "| MT | 5 | 1970-01-01 00:00:00.000005 |", - "+------+-----------+-------------------------------+", + "+-----------+------+-------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-------------------------------+", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 5 | MT | 1970-01-01 00:00:00.000005 |", + "+-----------+------+-------------------------------+", ]; assert_batches_eq!(&expected, &batch); @@ -693,15 +693,15 @@ mod test { let batch = collect(sort_plan.unwrap()).await.unwrap(); // data is not sorted on primary key(tag1, tag2, time) let expected = vec![ - "+------+-----------+-------------------------------+", - "| tag1 | field_int | time |", - "+------+-----------+-------------------------------+", - "| AL | 100 | 1970-01-01 00:00:00.000000050 |", - "| CT | 70 | 1970-01-01 00:00:00.000000100 |", - "| MT | 1000 | 1970-01-01 00:00:00.000001 |", - "| MT | 5 | 1970-01-01 00:00:00.000005 |", - "| MT | 10 | 1970-01-01 00:00:00.000007 |", - "+------+-----------+-------------------------------+", + "+-----------+------+-------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-------------------------------+", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 5 | MT | 1970-01-01 00:00:00.000005 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "+-----------+------+-------------------------------+", ]; assert_batches_eq!(&expected, &batch); } @@ -711,10 +711,10 @@ mod test { // Chunk 1 with 5 rows of data let chunk = Arc::new( TestChunk::new(1) + .with_time_column("t") .with_tag_column("t", "tag1") .with_tag_column("t", "tag2") .with_int_field_column("t", "field_int") - .with_time_column("t") .with_five_rows_of_data("t"), ); @@ -731,15 +731,15 @@ mod test { let batch = collect(Arc::clone(&input)).await.unwrap(); // data in its original non-sorted form let expected = vec![ - "+------+------+-----------+-------------------------------+", - "| tag1 | tag2 | field_int | time |", - "+------+------+-----------+-------------------------------+", - "| MT | CT | 1000 | 1970-01-01 00:00:00.000001 |", - "| MT | AL | 10 | 1970-01-01 00:00:00.000007 |", - "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |", - "| AL | MA | 100 | 1970-01-01 00:00:00.000000050 |", - "| MT | AL | 5 | 1970-01-01 00:00:00.000005 |", - "+------+------+-----------+-------------------------------+", + "+-----------+------+------+-------------------------------+", + "| field_int | tag1 | tag2 | time |", + "+-----------+------+------+-------------------------------+", + "| 1000 | MT | CT | 1970-01-01 00:00:00.000001 |", + "| 10 | MT | AL | 1970-01-01 00:00:00.000007 |", + "| 70 | CT | CT | 1970-01-01 00:00:00.000000100 |", + "| 100 | AL | MA | 1970-01-01 00:00:00.000000050 |", + "| 5 | MT | AL | 1970-01-01 00:00:00.000005 |", + "+-----------+------+------+-------------------------------+", ]; assert_batches_eq!(&expected, &batch); @@ -748,15 +748,15 @@ mod test { let batch = collect(sort_plan.unwrap()).await.unwrap(); // data is not sorted on primary key(tag1, tag2, time) let expected = vec![ - "+------+------+-----------+-------------------------------+", - "| tag1 | tag2 | field_int | time |", - "+------+------+-----------+-------------------------------+", - "| AL | MA | 100 | 1970-01-01 00:00:00.000000050 |", - "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |", - "| MT | AL | 5 | 1970-01-01 00:00:00.000005 |", - "| MT | AL | 10 | 1970-01-01 00:00:00.000007 |", - "| MT | CT | 1000 | 1970-01-01 00:00:00.000001 |", - "+------+------+-----------+-------------------------------+", + "+-----------+------+------+-------------------------------+", + "| field_int | tag1 | tag2 | time |", + "+-----------+------+------+-------------------------------+", + "| 100 | AL | MA | 1970-01-01 00:00:00.000000050 |", + "| 70 | CT | CT | 1970-01-01 00:00:00.000000100 |", + "| 5 | MT | AL | 1970-01-01 00:00:00.000005 |", + "| 10 | MT | AL | 1970-01-01 00:00:00.000007 |", + "| 1000 | MT | CT | 1970-01-01 00:00:00.000001 |", + "+-----------+------+------+-------------------------------+", ]; assert_batches_eq!(&expected, &batch); }