diff --git a/query/src/duplicate.rs b/query/src/duplicate.rs index 39687d55c3..6810df0101 100644 --- a/query/src/duplicate.rs +++ b/query/src/duplicate.rs @@ -138,30 +138,95 @@ where } } - /// Returns true if the chunk has a potential primary key overlap with the other chunk + /// Returns true if the chunk has a potential primary key overlap + /// with the other chunk. + /// + /// This this algorithm is O(2^N) in the worst case. However, the + /// pathological case is where two chunks each have a large + /// numbers of tags that have no overlap, which seems unlikely in + /// the real world. + /// + /// Note this algoritm is quite conservative (in that it will + /// assume that any column can contain nulls) and thus can match + /// with chunks that do not have that column. for example + /// + /// Chunk 1: tag_a + /// Chunk 2: tag_a, tag_b + /// + /// In this case Chunk 2 has values for tag_b but Chunk 1 + /// doesn't have any values in tag_b (its values are implicitly + /// null) + /// + /// If Chunk 2 has any null values in the tag_b column, it could + /// overlap with Chunk 1 (as logically there can be rows with + /// (tag_a = NULL, tag_b = NULL) in both chunks + /// + /// We could make this algorithm significantly less conservative + /// if we stored the Null count in the ColumnSummary (and thus + /// could rule out matches with columns that were not present) if + /// there were no NULLs fn potential_overlap(&self, other: &Self) -> Result { - // in order to have overlap, *all* the columns in the sort order - // need to be the same. Note gaps in the sort order mean they - // are for different parts of the keyspace - if self.key_summaries.len() != other.key_summaries.len() { - // Short circuit on different lengths - return Ok(false); - } + // This algorithm assumes that the keys are sorted by name (so + // they can't appear in different orders on the two sides) + debug_assert!(self + .key_summaries + .windows(2) + .all(|s| s[0].name <= s[1].name)); + debug_assert!(other + .key_summaries + .windows(2) + .all(|s| s[0].name <= s[1].name)); + self.potential_overlap_impl(0, other, 0) + } - let iter = self.key_summaries.iter().zip(other.key_summaries.iter()); - for (s1, s2) in iter { - if s1.name != s2.name || !Self::columns_might_overlap(s1, s2)? { - return Ok(false); + // Checks the remainder of self.columns[self_idx..] and + // other.columns[..other_idx] if they are compatible + fn potential_overlap_impl( + &self, + mut self_idx: usize, + other: &Self, + mut other_idx: usize, + ) -> Result { + loop { + let s1 = self.key_summaries.get(self_idx); + let s2 = other.key_summaries.get(other_idx); + + if let (Some(s1), Some(s2)) = (s1, s2) { + if s1.name == s2.name { + // pk matched in this position, so check values. If we + // find no overlap, know this is false, otherwise need to keep checking + if Self::columns_might_overlap(s1, s2)? { + self_idx += 1; + other_idx += 1; + } else { + return Ok(false); + } + } else { + // name didn't match, so try and find the next + // place it does. Since there may be missing keys + // in each side, need to check each in turn + // + // Note this will result in O(num_tags) stack + // frames in the worst case, but we expect the + // number of tags to be relatively small (~20 at + // the time of this writing) + return Ok(self.potential_overlap_impl(self_idx + 1, other, other_idx)? + || self.potential_overlap_impl(self_idx, other, other_idx + 1)?); + } + } else { + // ran out of columns to check on one side, assume the + // other could have nulls all the way down (due to null + // assumption) + return Ok(true); } } - - Ok(true) } /// Returns true if the two columns MAY overlap other, based on /// statistics pub fn columns_might_overlap(s1: &ColumnSummary, s2: &ColumnSummary) -> Result { use Statistics::*; + let overlap = match (&s1.stats, &s2.stats) { (I64(s1), I64(s2)) => s1.overlaps(s2), (U64(s1), U64(s2)) => s1.overlaps(s2), @@ -176,14 +241,15 @@ where } }; - // If either column has no min/max, treat the column as - // being entirely null + // If either column has no min/max, treat the column as being + // entirely null, meaning that it could overlap the other + // stats if it had nulls. let is_none = s1.stats.is_none() || s2.stats.is_none(); match overlap { StatOverlap::NonZero => Ok(true), StatOverlap::Zero => Ok(false), - StatOverlap::Unknown if is_none => Ok(false), // no stats means no values + StatOverlap::Unknown if is_none => Ok(true), // This case means there some stats, but not all. // Unclear how this could happen, so throw an error for now StatOverlap::Unknown => InternalPartialStatistics { @@ -313,7 +379,27 @@ mod test { let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); - let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"]; + // the overlap could come when (tag1 = NULL, tag2=NULL) which + // could exist in either chunk + let expected = vec!["Group 0: [chunk1, chunk2]"]; + assert_groups_eq!(expected, groups); + } + + #[test] + fn different_tag_names_multi_tags() { + // check that if chunks overlap but in different tag names + let c1 = TestChunk::new("chunk1") + .with_tag("tag1", Some("aaa"), Some("bbb")) + .with_tag("tag2", Some("aaa"), Some("bbb")); + + let c2 = TestChunk::new("chunk2") + .with_tag("tag2", Some("aaa"), Some("bbb")) + .with_tag("tag3", Some("aaa"), Some("bbb")); + + let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); + + // the overlap could come when (tag1 = NULL, tag2, tag3=NULL) + let expected = vec!["Group 0: [chunk1, chunk2]"]; assert_groups_eq!(expected, groups); } @@ -370,12 +456,13 @@ mod test { let c2 = TestChunk::new("chunk2") // tag1 and timestamp overlap, but no tag2 (aka it is all null) + // so it could overlap if there was a null tag2 value in chunk1 .with_tag("tag1", Some("aaa"), Some("bbb")) .with_timestamp(500, 1000); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); - let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"]; + let expected = vec!["Group 0: [chunk1, chunk2]"]; assert_groups_eq!(expected, groups); } @@ -387,15 +474,15 @@ mod test { .with_timestamp(0, 1000); let c2 = TestChunk::new("chunk2") - // tag1 and timestamp overlap, tag2 has no stats (null) - // so we say they can't overlap + // tag1 and timestamp overlap, tag2 has no stats (is all null) + // so they might overlap if chunk1 had a null in tag 2 .with_tag("tag1", Some("aaa"), Some("bbb")) .with_tag("tag2", None, None) .with_timestamp(500, 1000); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); - let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"]; + let expected = vec!["Group 0: [chunk1, chunk2]"]; assert_groups_eq!(expected, groups); } @@ -445,7 +532,7 @@ mod test { #[test] fn mismatched_types() { - // Test if same column has different types in different + // When the same column has different types in different // chunks; this will likely cause errors elsewhere in practice // as the schemas are incompatible (and can't be merged) let c1 = TestChunk::new("chunk1") @@ -454,14 +541,14 @@ mod test { let c2 = TestChunk::new("chunk2") // tag1 column is actually a field is different in chunk - // 2, so even though the timestamps overlap these chunks - // don't have duplicates + // 2, so since the timestamps overlap these chunks + // might also have duplicates (if tag1 was null in c1) .with_int_field("tag1", Some(100), Some(200)) .with_timestamp(0, 1000); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); - let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"]; + let expected = vec!["Group 0: [chunk1, chunk2]"]; assert_groups_eq!(expected, groups); } @@ -477,7 +564,7 @@ mod test { } /// Mocked out prunable provider to use testing overlaps - #[derive(Debug)] + #[derive(Debug, Clone)] struct TestChunk { // The name of this chunk name: String, diff --git a/query/src/lib.rs b/query/src/lib.rs index 197c96ca4a..316d85c5db 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -122,6 +122,9 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync { predicate: &Predicate, selection: Selection<'_>, ) -> Result; + + /// Returns true if data of this chunk is sorted + fn is_sorted_on_pk(&self) -> bool; } #[async_trait] diff --git a/query/src/provider.rs b/query/src/provider.rs index 0e308cf55e..5622675025 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -2,7 +2,7 @@ use std::sync::Arc; -use arrow::{datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError}; +use arrow::{compute::SortOptions, datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError}; use datafusion::{ datasource::{ datasource::{Statistics, TableProviderFilterPushDown}, @@ -10,7 +10,11 @@ use datafusion::{ }, error::{DataFusionError, Result as DataFusionResult}, logical_plan::Expr, - physical_plan::ExecutionPlan, + physical_plan::{ + expressions::{col, PhysicalSortExpr}, + sort::SortExec, + ExecutionPlan, + }, }; use internal_types::schema::{merge::SchemaMerger, Schema}; use observability_deps::tracing::debug; @@ -50,6 +54,11 @@ pub enum Error { source: datafusion::error::DataFusionError, }, + #[snafu(display("Internal error adding sort operator '{}'", source,))] + InternalSort { + source: datafusion::error::DataFusionError, + }, + #[snafu(display("Internal error: Can not group chunks '{}'", source,))] InternalChunkGrouping { source: crate::duplicate::Error }, } @@ -374,7 +383,7 @@ impl Deduplicater { Arc::clone(&schema), chunk_with_duplicates.to_owned(), predicate.clone(), - )); + )?); } // Go over non_duplicates_chunks, build a plan for it @@ -495,14 +504,54 @@ impl Deduplicater { schema: ArrowSchemaRef, chunk: Arc, // This chunk is identified having duplicates predicate: Predicate, - ) -> Arc { - // // TODO - // // Currently return just like there are no overlaps, no duplicates - Arc::new(IOxReadFilterNode::new( + ) -> Result> { + // Create the bottom node IOxReadFilterNode for this chunk + let input: Arc = Arc::new(IOxReadFilterNode::new( Arc::clone(&table_name), schema, - vec![chunk], + vec![Arc::clone(&chunk)], predicate, + )); + + // Add the sort operator, SortExec, if needed + //let plan = Self::build_sort_plan(chunk, input); + Self::build_sort_plan(chunk, input) + + // Create DeduplicateExc + // TODO: Add DeuplicateExec here when it is implemented in https://github.com/influxdata/influxdb_iox/issues/1646 + //plan = add_deduplicate_exec(plan); + + //plan + } + + /// Add SortExec operator on top of the input plan of the given chunk + /// The plan will be sorted on the chunk's primary key + fn build_sort_plan( + chunk: Arc, + input: Arc, + ) -> Result> { + if chunk.is_sorted_on_pk() { + return Ok(input); + } + + // Sort the chunk on pk + let key_summaries = chunk.summary().primary_key_columns(); + + // build sort expression + let mut sort_exprs = vec![]; + for key in key_summaries { + sort_exprs.push(PhysicalSortExpr { + expr: col(key.name.as_str()), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }); + } + + // Create SortExec operator + Ok(Arc::new( + SortExec::try_new(sort_exprs, input).context(InternalSort)?, )) } @@ -560,6 +609,10 @@ impl ChunkPruner for NoOpPruner { #[cfg(test)] mod test { + use arrow_util::assert_batches_eq; + use datafusion::physical_plan::collect; + use internal_types::selection::Selection; + use crate::test::TestChunk; use super::*; @@ -599,6 +652,115 @@ mod test { assert_eq!(chunk_ids(&deduplicator.no_duplicates_chunks), "1"); } + #[tokio::test] + async fn sort_planning_one_tag_with_time() { + // Chunk 1 with 5 rows of data + let chunk = Arc::new( + TestChunk::new(1) + .with_time_column("t") + .with_tag_column("t", "tag1") + .with_int_field_column("t", "field_int") + .with_five_rows_of_data("t"), + ); + + // Datafusion schema of the chunk + let schema = chunk.table_schema(Selection::All).unwrap().as_arrow(); + + // IOx scan operator + let input: Arc = Arc::new(IOxReadFilterNode::new( + Arc::from("t"), + schema, + vec![Arc::clone(&chunk)], + Predicate::default(), + )); + let batch = collect(Arc::clone(&input)).await.unwrap(); + // data in its original non-sorted form + let expected = vec![ + "+-----------+------+-------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-------------------------------+", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 5 | MT | 1970-01-01 00:00:00.000005 |", + "+-----------+------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + + // Add Sort operator on top of IOx scan + let sort_plan = Deduplicater::build_sort_plan(chunk, input); + let batch = collect(sort_plan.unwrap()).await.unwrap(); + // data is not sorted on primary key(tag1, tag2, time) + let expected = vec![ + "+-----------+------+-------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-------------------------------+", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 5 | MT | 1970-01-01 00:00:00.000005 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "+-----------+------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + } + + #[tokio::test] + async fn sort_planning_two_tags_with_time() { + // Chunk 1 with 5 rows of data + let chunk = Arc::new( + TestChunk::new(1) + .with_time_column("t") + .with_tag_column("t", "tag1") + .with_tag_column("t", "tag2") + .with_int_field_column("t", "field_int") + .with_five_rows_of_data("t"), + ); + + // Datafusion schema of the chunk + let schema = chunk.table_schema(Selection::All).unwrap().as_arrow(); + + // IOx scan operator + let input: Arc = Arc::new(IOxReadFilterNode::new( + Arc::from("t"), + schema, + vec![Arc::clone(&chunk)], + Predicate::default(), + )); + let batch = collect(Arc::clone(&input)).await.unwrap(); + // data in its original non-sorted form + let expected = vec![ + "+-----------+------+------+-------------------------------+", + "| field_int | tag1 | tag2 | time |", + "+-----------+------+------+-------------------------------+", + "| 1000 | MT | CT | 1970-01-01 00:00:00.000001 |", + "| 10 | MT | AL | 1970-01-01 00:00:00.000007 |", + "| 70 | CT | CT | 1970-01-01 00:00:00.000000100 |", + "| 100 | AL | MA | 1970-01-01 00:00:00.000000050 |", + "| 5 | MT | AL | 1970-01-01 00:00:00.000005 |", + "+-----------+------+------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + + // Add Sort operator on top of IOx scan + let sort_plan = Deduplicater::build_sort_plan(chunk, input); + let batch = collect(sort_plan.unwrap()).await.unwrap(); + // data is not sorted on primary key(tag1, tag2, time) + let expected = vec![ + "+-----------+------+------+-------------------------------+", + "| field_int | tag1 | tag2 | time |", + "+-----------+------+------+-------------------------------+", + "| 100 | AL | MA | 1970-01-01 00:00:00.000000050 |", + "| 70 | CT | CT | 1970-01-01 00:00:00.000000100 |", + "| 5 | MT | AL | 1970-01-01 00:00:00.000005 |", + "| 10 | MT | AL | 1970-01-01 00:00:00.000007 |", + "| 1000 | MT | CT | 1970-01-01 00:00:00.000001 |", + "+-----------+------+------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + } + fn chunk_ids(group: &[Arc]) -> String { let ids = group.iter().map(|c| c.id().to_string()).collect::>(); ids.join(", ") diff --git a/query/src/test.rs b/query/src/test.rs index 12f3584dab..59a8183aab 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -360,6 +360,79 @@ impl TestChunk { }) .collect::>(); + let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch"); + println!("TestChunk batch data: {:#?}", batch); + + self.table_data.push(Arc::new(batch)); + self + } + + /// Prepares this chunk to return a specific record batch with five + /// rows of non null data that look like + /// "+------+------+-----------+-------------------------------+", + /// "| tag1 | tag2 | field_int | time |", + /// "+------+------+-----------+-------------------------------+", + /// "| MA | MA | 1000 | 1970-01-01 00:00:00.000001 |", + /// "| MT | MT | 10 | 1970-01-01 00:00:00.000007 |", + /// "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |", + /// "| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |", + /// "| MT | MT | 5 | 1970-01-01 00:00:00.000005 |", + /// "+------+------+-----------+-------------------------------+", + pub fn with_five_rows_of_data(mut self, _table_name: impl Into) -> Self { + //let table_name = table_name.into(); + let schema = self + .table_schema + .as_ref() + .expect("table must exist in TestChunk"); + + // create arrays + let columns = schema + .iter() + .map(|(_influxdb_column_type, field)| match field.data_type() { + DataType::Int64 => { + Arc::new(Int64Array::from(vec![1000, 10, 70, 100, 5])) as ArrayRef + } + DataType::Utf8 => { + match field.name().as_str() { + "tag1" => Arc::new(StringArray::from(vec!["MT", "MT", "CT", "AL", "MT"])) + as ArrayRef, + "tag2" => Arc::new(StringArray::from(vec!["CT", "AL", "CT", "MA", "AL"])) + as ArrayRef, + _ => Arc::new(StringArray::from(vec!["CT", "MT", "AL", "AL", "MT"])) + as ArrayRef, + } + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => Arc::new( + TimestampNanosecondArray::from_vec(vec![1000, 7000, 100, 50, 5000], None), + ) as ArrayRef, + DataType::Dictionary(key, value) + if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 => + { + match field.name().as_str() { + "tag1" => Arc::new( + vec!["MT", "MT", "CT", "AL", "MT"] + .into_iter() + .collect::>(), + ) as ArrayRef, + "tag2" => Arc::new( + vec!["CT", "AL", "CT", "MA", "AL"] + .into_iter() + .collect::>(), + ) as ArrayRef, + _ => Arc::new( + vec!["CT", "MT", "AL", "AL", "MT"] + .into_iter() + .collect::>(), + ) as ArrayRef, + } + } + _ => unimplemented!( + "Unimplemented data type for test database: {:?}", + field.data_type() + ), + }) + .collect::>(); + let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch"); self.table_data.push(Arc::new(batch)); @@ -422,6 +495,11 @@ impl PartitionChunk for TestChunk { Ok(Box::pin(stream)) } + /// Returns true if data of this chunk is sorted + fn is_sorted_on_pk(&self) -> bool { + false + } + fn apply_predicate(&self, predicate: &Predicate) -> Result { self.check_error()?; diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 6fe64ddcc8..014af96ff8 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -449,6 +449,15 @@ impl PartitionChunk for DbChunk { } } } + + // TODOs: return the right value. For now the chunk is assumed to be not sorted + fn is_sorted_on_pk(&self) -> bool { + match &self.state { + State::MutableBuffer { .. } => false, + State::ReadBuffer { .. } => false, + State::ParquetFile { .. } => false, + } + } } impl Prunable for DbChunk {