diff --git a/query/src/lib.rs b/query/src/lib.rs index 7e9d413883..316d85c5db 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -124,7 +124,7 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync { ) -> Result; /// Returns true if data of this chunk is sorted - fn is_sorted(&self) -> bool; + fn is_sorted_on_pk(&self) -> bool; } #[async_trait] diff --git a/query/src/provider.rs b/query/src/provider.rs index 058ab895e2..305218d367 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -534,7 +534,7 @@ impl Deduplicater { chunk: Arc, // This chunk is identified having duplicates predicate: Predicate, ) -> Result> { - // Create the bottom node IOxRedFilterNode for this chunk + // Create the bottom node IOxReadFilterNode for this chunk let input: Arc = Arc::new(IOxReadFilterNode::new( Arc::clone(&table_name), schema, @@ -559,28 +559,29 @@ impl Deduplicater { chunk: Arc, input: Arc, ) -> Result> { - if !chunk.is_sorted() { - let key_summaries = chunk.summary().primary_key_columns(); - - // build sort expression - let mut sort_exprs = vec![]; - for key in key_summaries { - sort_exprs.push(PhysicalSortExpr { - expr: col(key.name.as_str()), - options: SortOptions { - descending: false, - nulls_first: false, - }, - }); - } - - // Create SortExec operator - Ok(Arc::new( - SortExec::try_new(sort_exprs, input).context(InternalSort)?, - )) - } else { - Ok(input) + if chunk.is_sorted_on_pk() { + return Ok(input); } + + // Sort the chunk on pk + let key_summaries = chunk.summary().primary_key_columns(); + + // build sort expression + let mut sort_exprs = vec![]; + for key in key_summaries { + sort_exprs.push(PhysicalSortExpr { + expr: col(key.name.as_str()), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }); + } + + // Create SortExec operator + Ok(Arc::new( + SortExec::try_new(sort_exprs, input).context(InternalSort)?, + )) } /// Return the simplest IOx scan plan of a given chunk which is IOxReadFilterNode @@ -637,10 +638,9 @@ impl ChunkPruner for NoOpPruner { #[cfg(test)] mod test { - use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use arrow_util::assert_batches_eq; use datafusion::physical_plan::collect; - use internal_types::schema::TIME_COLUMN_NAME; + use internal_types::selection::Selection; use crate::test::TestChunk; @@ -682,7 +682,61 @@ mod test { } #[tokio::test] - async fn sort_planning() { + async fn sort_planning_one_tag_with_time() { + // Chunk 1 with 5 rows of data + let chunk = Arc::new( + TestChunk::new(1) + .with_time_column("t") + .with_int_field_column("t", "field_int") + .with_tag_column("t", "tag1") + .with_five_rows_of_data("t"), + ); + + // Datafusion schema of the chunk + let schema = chunk.table_schema(Selection::All).unwrap().as_arrow(); + + // IOx scan operator + let input: Arc = Arc::new(IOxReadFilterNode::new( + Arc::from("t"), + schema, + vec![Arc::clone(&chunk)], + Predicate::default(), + )); + let batch = collect(Arc::clone(&input)).await.unwrap(); + // data in its original non-sorted form + let expected = vec![ + "+------+-----------+-------------------------------+", + "| tag1 | field_int | time |", + "+------+-----------+-------------------------------+", + "| MT | 1000 | 1970-01-01 00:00:00.000001 |", + "| MT | 10 | 1970-01-01 00:00:00.000007 |", + "| CT | 70 | 1970-01-01 00:00:00.000000100 |", + "| AL | 100 | 1970-01-01 00:00:00.000000050 |", + "| MT | 5 | 1970-01-01 00:00:00.000005 |", + "+------+-----------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + + // Add Sort operator on top of IOx scan + let sort_plan = Deduplicater::build_sort_plan(chunk, input); + let batch = collect(sort_plan.unwrap()).await.unwrap(); + // data is not sorted on primary key(tag1, tag2, time) + let expected = vec![ + "+------+-----------+-------------------------------+", + "| tag1 | field_int | time |", + "+------+-----------+-------------------------------+", + "| AL | 100 | 1970-01-01 00:00:00.000000050 |", + "| CT | 70 | 1970-01-01 00:00:00.000000100 |", + "| MT | 1000 | 1970-01-01 00:00:00.000001 |", + "| MT | 5 | 1970-01-01 00:00:00.000005 |", + "| MT | 10 | 1970-01-01 00:00:00.000007 |", + "+------+-----------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + } + + #[tokio::test] + async fn sort_planning_two_tags_with_time() { // Chunk 1 with 5 rows of data let chunk = Arc::new( TestChunk::new(1) @@ -690,28 +744,11 @@ mod test { .with_int_field_column("t", "field_int") .with_tag_column("t", "tag2") .with_tag_column("t", "tag1") - .with_five_row_of_null_data("t"), + .with_five_rows_of_data("t"), ); // Datafusion schema of the chunk - let schema = Arc::new(Schema::new(vec![ - Field::new( - "tag1", - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), - true, - ), - Field::new( - "tag2", - DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), - true, - ), - Field::new("field_int", DataType::Int64, true), - Field::new( - TIME_COLUMN_NAME, - DataType::Timestamp(TimeUnit::Nanosecond, None), - true, - ), - ])); + let schema = chunk.table_schema(Selection::All).unwrap().as_arrow(); // IOx scan operator let input: Arc = Arc::new(IOxReadFilterNode::new( @@ -726,11 +763,11 @@ mod test { "+------+------+-----------+-------------------------------+", "| tag1 | tag2 | field_int | time |", "+------+------+-----------+-------------------------------+", - "| MA | MA | 1000 | 1970-01-01 00:00:00.000001 |", - "| MT | MT | 10 | 1970-01-01 00:00:00.000007 |", + "| MT | CT | 1000 | 1970-01-01 00:00:00.000001 |", + "| MT | AL | 10 | 1970-01-01 00:00:00.000007 |", "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |", - "| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |", - "| MT | MT | 5 | 1970-01-01 00:00:00.000005 |", + "| AL | MA | 100 | 1970-01-01 00:00:00.000000050 |", + "| MT | AL | 5 | 1970-01-01 00:00:00.000005 |", "+------+------+-----------+-------------------------------+", ]; assert_batches_eq!(&expected, &batch); @@ -743,11 +780,11 @@ mod test { "+------+------+-----------+-------------------------------+", "| tag1 | tag2 | field_int | time |", "+------+------+-----------+-------------------------------+", - "| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |", + "| AL | MA | 100 | 1970-01-01 00:00:00.000000050 |", "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |", - "| MA | MA | 1000 | 1970-01-01 00:00:00.000001 |", - "| MT | MT | 5 | 1970-01-01 00:00:00.000005 |", - "| MT | MT | 10 | 1970-01-01 00:00:00.000007 |", + "| MT | AL | 5 | 1970-01-01 00:00:00.000005 |", + "| MT | AL | 10 | 1970-01-01 00:00:00.000007 |", + "| MT | CT | 1000 | 1970-01-01 00:00:00.000001 |", "+------+------+-----------+-------------------------------+", ]; assert_batches_eq!(&expected, &batch); diff --git a/query/src/test.rs b/query/src/test.rs index c40a519a3f..3ca14246fe 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -380,7 +380,7 @@ impl TestChunk { /// "| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |", /// "| MT | MT | 5 | 1970-01-01 00:00:00.000005 |", /// "+------+------+-----------+-------------------------------+", - pub fn with_five_row_of_null_data(mut self, _table_name: impl Into) -> Self { + pub fn with_five_rows_of_data(mut self, _table_name: impl Into) -> Self { //let table_name = table_name.into(); let schema = self .table_schema @@ -395,7 +395,14 @@ impl TestChunk { Arc::new(Int64Array::from(vec![1000, 10, 70, 100, 5])) as ArrayRef } DataType::Utf8 => { - Arc::new(StringArray::from(vec!["MA", "MT", "CT", "AL", "MT"])) as ArrayRef + match field.name().as_str() { + "tag1" => Arc::new(StringArray::from(vec!["MT", "MT", "CT", "AL", "MT"])) + as ArrayRef, + "tag2" => Arc::new(StringArray::from(vec!["CT", "AL", "CT", "MA", "AL"])) + as ArrayRef, + _ => Arc::new(StringArray::from(vec!["CT", "MT", "AL", "AL", "MT"])) + as ArrayRef, + } } DataType::Timestamp(TimeUnit::Nanosecond, _) => Arc::new( TimestampNanosecondArray::from_vec(vec![1000, 7000, 100, 50, 5000], None), @@ -403,9 +410,23 @@ impl TestChunk { DataType::Dictionary(key, value) if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 => { - let dict: DictionaryArray = - vec!["MA", "MT", "CT", "AL", "MT"].into_iter().collect(); - Arc::new(dict) as ArrayRef + match field.name().as_str() { + "tag1" => Arc::new( + vec!["MT", "MT", "CT", "AL", "MT"] + .into_iter() + .collect::>(), + ) as ArrayRef, + "tag2" => Arc::new( + vec!["CT", "AL", "CT", "MA", "AL"] + .into_iter() + .collect::>(), + ) as ArrayRef, + _ => Arc::new( + vec!["CT", "MT", "AL", "AL", "MT"] + .into_iter() + .collect::>(), + ) as ArrayRef, + } } _ => unimplemented!( "Unimplemented data type for test database: {:?}", @@ -415,7 +436,6 @@ impl TestChunk { .collect::>(); let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch"); - println!("TestChunk batch data: {:#?}", batch); self.table_data.push(Arc::new(batch)); self @@ -478,7 +498,7 @@ impl PartitionChunk for TestChunk { } /// Returns true if data of this chunk is sorted - fn is_sorted(&self) -> bool { + fn is_sorted_on_pk(&self) -> bool { false } diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 6188061d41..014af96ff8 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -451,7 +451,7 @@ impl PartitionChunk for DbChunk { } // TODOs: return the right value. For now the chunk is assumed to be not sorted - fn is_sorted(&self) -> bool { + fn is_sorted_on_pk(&self) -> bool { match &self.state { State::MutableBuffer { .. } => false, State::ReadBuffer { .. } => false,