diff --git a/internal_types/src/schema.rs b/internal_types/src/schema.rs index 66d8fde033..bfb8daa8a8 100644 --- a/internal_types/src/schema.rs +++ b/internal_types/src/schema.rs @@ -251,6 +251,23 @@ impl Schema { self.inner = Arc::new(new_schema); } + /// Returns true of the sort_key include all primary key cols + pub fn is_sorted_on_pk(&self) -> bool { + if let Some(sort_key) = self.sort_key() { + let key_columns = self.primary_key(); + + for key_col in key_columns { + if sort_key.get(key_col).is_none() { + return false; // pk col is not part of the sort key + } + } + true + } else { + // not sorted yet + false + } + } + /// Provide a reference to the underlying Arrow Schema object pub fn inner(&self) -> &ArrowSchemaRef { &self.inner @@ -1295,4 +1312,86 @@ mod test { } } if &column_name == "time" )); } + + #[test] + fn test_is_sort_on_pk() { + // Sort key the same as pk + let mut sort_key = SortKey::with_capacity(3); + sort_key.with_col("tag4"); + sort_key.with_col("tag3"); + sort_key.with_col("tag2"); + sort_key.with_col("tag1"); + sort_key.with_col(TIME_COLUMN_NAME); + + let schema = SchemaBuilder::new() + .influx_field("the_field", String) + .tag("tag1") + .tag("tag2") + .tag("tag3") + .tag("tag4") + .timestamp() + .measurement("the_measurement") + .build_with_sort_key(&sort_key) + .unwrap(); + + assert!(schema.is_sorted_on_pk()); + + // Sort key does not include all pk cols + let mut sort_key = SortKey::with_capacity(3); + sort_key.with_col("tag3"); + sort_key.with_col("tag1"); + sort_key.with_col(TIME_COLUMN_NAME); + + let schema = SchemaBuilder::new() + .influx_field("the_field", String) + .tag("tag1") + .tag("tag2") + .tag("tag3") + .tag("tag4") + .timestamp() + .measurement("the_measurement") + .build_with_sort_key(&sort_key) + .unwrap(); + + assert!(!schema.is_sorted_on_pk()); + + // No sort key + let schema = SchemaBuilder::new() + .influx_field("the_field", String) + .tag("tag1") + .tag("tag2") + .tag("tag3") + .tag("tag4") + .timestamp() + .measurement("the_measurement") + .build() + .unwrap(); + + assert!(!schema.is_sorted_on_pk()); + + // No PK, no sort key + let schema = SchemaBuilder::new() + .influx_field("the_field", String) + .measurement("the_measurement") + .build() + .unwrap(); + assert!(!schema.is_sorted_on_pk()); + + // No PK, sort key on non pk + let mut sort_key = SortKey::with_capacity(3); + sort_key.with_col("the_field"); + + let schema = SchemaBuilder::new() + .influx_field("the_field", String) + .tag("tag1") + .tag("tag2") + .tag("tag3") + .tag("tag4") + .timestamp() + .measurement("the_measurement") + .build_with_sort_key(&sort_key) + .unwrap(); + + assert!(!schema.is_sorted_on_pk()); + } } diff --git a/internal_types/src/schema/sort.rs b/internal_types/src/schema/sort.rs index a469c9791b..a56fd0a495 100644 --- a/internal_types/src/schema/sort.rs +++ b/internal_types/src/schema/sort.rs @@ -496,4 +496,66 @@ mod tests { let merge_key = SortKey::try_merge_key(&key_abc_2, &key_bac_2); assert_eq!(merge_key, None); } + + #[test] + fn test_selected_sort_key() { + let mut sort_key = SortKey::with_capacity(4); + sort_key.with_col("a"); // default sort option + sort_key.with_col_opts("b", true, false); + sort_key.with_col_opts("c", false, false); + sort_key.with_col(TIME_COLUMN_NAME); + + // input cols is empty -> nothing selected + let cols = vec![]; + let selected_key = sort_key.selected_sort_key(cols); + assert!(selected_key.is_empty()); + + // input cols is not part of the key -> nothing selected + let cols = vec!["d", "e"]; + let selected_key = sort_key.selected_sort_key(cols); + assert!(selected_key.is_empty()); + + // input cols exactly the same and in the same order -> exact sort_key selected + let cols = vec!["a", "b", "c", TIME_COLUMN_NAME]; + let selected_key = sort_key.selected_sort_key(cols); + assert_eq!(selected_key, sort_key); + + // input cols exactly the same but in different order -> exact sort_key selected + let cols = vec!["c", TIME_COLUMN_NAME, "b", "a"]; + let selected_key = sort_key.selected_sort_key(cols); + assert_eq!(selected_key, sort_key); + + // input cols is subset but in the same order -> subset selected + let cols = vec!["a", "b"]; + let selected_key = sort_key.selected_sort_key(cols); + let mut expected_key = SortKey::with_capacity(2); + expected_key.with_col("a"); // default sort option + expected_key.with_col_opts("b", true, false); + assert_eq!(selected_key, expected_key); + + // input cols is subset but in the same order -> subset selected + let cols = vec![TIME_COLUMN_NAME]; + let selected_key = sort_key.selected_sort_key(cols); + let mut expected_key = SortKey::with_capacity(1); + expected_key.with_col(TIME_COLUMN_NAME); + assert_eq!(selected_key, expected_key); + + // input cols is subset but in the same order with gap -> subset selected + let cols = vec!["a", "c", TIME_COLUMN_NAME]; + let selected_key = sort_key.selected_sort_key(cols); + let mut expected_key = SortKey::with_capacity(3); + expected_key.with_col("a"); // default sort option + expected_key.with_col_opts("c", false, false); + expected_key.with_col(TIME_COLUMN_NAME); + assert_eq!(selected_key, expected_key); + + // input cols is subset but in different order -> subset in the order with sort_key selected + let cols = vec![TIME_COLUMN_NAME, "b", "c"]; + let selected_key = sort_key.selected_sort_key(cols); + let mut expected_key = SortKey::with_capacity(3); + expected_key.with_col_opts("b", true, false); + expected_key.with_col_opts("c", false, false); + expected_key.with_col(TIME_COLUMN_NAME); + assert_eq!(selected_key, expected_key); + } } diff --git a/query/src/lib.rs b/query/src/lib.rs index 93ee9fe6ed..9e3b61a2aa 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -147,8 +147,8 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync { /// Returns the sort key of the chunk if any fn sort_key(&self) -> Option>; - /// Sets sort key for the schema of this chunk - fn set_sort_key(&mut self, sort_key: &SortKey<'_>); + /// Returns chunk type which is either MUB, RUB, OS + fn chunk_type(&self) -> &str; } #[async_trait] diff --git a/query/src/provider.rs b/query/src/provider.rs index 9052478f66..17d19c9b55 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -20,7 +20,7 @@ use datafusion::{ }, }; use internal_types::schema::{merge::SchemaMerger, sort::SortKey, Schema}; -use observability_deps::tracing::{debug, trace}; +use observability_deps::tracing::{debug, info, trace}; use crate::{ compute_sort_key, @@ -688,7 +688,19 @@ impl Deduplicater { trace!(ChunkID=?chunk.id(), "Chunk is sorted and no need the sort operator"); return Ok(input); } + } else { + // The chunk is sorted but not on different order with super sort key. + // Log it for investigating data set to improve performance further + info!(chunk_type=?chunk.chunk_type(), + chunk_ID=?chunk.id(), + chunk_current_sort_order=?chunk_sort_key, + chunk_super_sort_key=?super_sort_key, + "Chunk will get resorted in build_sort_plan due to new cardinality rate between key columns"); } + } else { + info!(chunk_type=?chunk.chunk_type(), + chunk_ID=?chunk.id(), + "Chunk is not yet sorted and will get sorted in build_sort_plan"); } // Build the chunk's sort key that is a subset of the super_sort_key @@ -696,10 +708,15 @@ impl Deduplicater { // First get the chunk pk columns let schema = chunk.schema(); let key_columns = schema.primary_key(); - trace!(pk_columns=?key_columns, "PK columns of the chunk that have not been sorted yet"); // Now get the key subset of the super key that includes the chunk's pk columns - let chunk_sort_key = super_sort_key.selected_sort_key(key_columns); + let chunk_sort_key = super_sort_key.selected_sort_key(key_columns.clone()); + + info!(chunk_type=?chunk.chunk_type(), + chunk_ID=?chunk.id(), + pk_columns=?key_columns, + sort_key=?chunk_sort_key, + "Chunk is getting sorted"); // Build arrow sort expression for the chunk sort key let input_schema = input.schema(); @@ -707,9 +724,6 @@ impl Deduplicater { trace!(Sort_Exprs=?sort_exprs, Chunk_ID=?chunk.id(), "Sort Expression for the sort operator of chunk"); - // The chunk must be sorted after this, set sort key for it - // Todo: chunk.set_sort_key(&chunk_sort_key); - // Create SortExec operator Ok(Arc::new( SortExec::try_new(sort_exprs, input).context(InternalSort)?, @@ -934,15 +948,12 @@ mod test { 5, Some(NonZeroU64::new(5).unwrap()), ) - // Actual distinct count of tag1 is 3 but make it 2 here to have it different - // from the one of tag2 to have deterministic column sort order to return - // deterministic sorted results .with_tag_column_with_full_stats( "tag1", Some("AL"), Some("MT"), 5, - Some(NonZeroU64::new(2).unwrap()), + Some(NonZeroU64::new(3).unwrap()), ) .with_tag_column_with_full_stats( "tag2", @@ -1017,15 +1028,12 @@ mod test { 5, Some(NonZeroU64::new(5).unwrap()), ) - // Actual distinct count of tag1 is 3 but make it 2 here to have it different - // from the one of tag2 to have deterministic column sort order to return - // deterministic sorted results .with_tag_column_with_full_stats( "tag1", Some("AL"), Some("MT"), 5, - Some(NonZeroU64::new(2).unwrap()), + Some(NonZeroU64::new(3).unwrap()), ) .with_tag_column_with_full_stats( "tag2", @@ -1083,15 +1091,12 @@ mod test { 5, Some(NonZeroU64::new(5).unwrap()), ) - // Actual distinct count of tag1 is 3 but make it 2 here to have it different - // from the one of tag2 to have deterministic column sort order to return - // deterministic sorted results .with_tag_column_with_full_stats( "tag1", Some("AL"), Some("MT"), 5, - Some(NonZeroU64::new(2).unwrap()), + Some(NonZeroU64::new(3).unwrap()), ) .with_tag_column_with_full_stats( "tag2", @@ -1114,15 +1119,12 @@ mod test { 5, Some(NonZeroU64::new(5).unwrap()), ) - // Actual distinct count of tag1 is 3 but make it 2 here to have it different - // from the one of tag2 to have deterministic column sort order to return - // deterministic sorted results .with_tag_column_with_full_stats( "tag1", Some("AL"), Some("MT"), 5, - Some(NonZeroU64::new(2).unwrap()), + Some(NonZeroU64::new(3).unwrap()), ) .with_tag_column_with_full_stats( "tag2", @@ -1195,15 +1197,12 @@ mod test { 5, Some(NonZeroU64::new(5).unwrap()), ) - // Actual distinct count of tag1 is 3 but make it 2 here to have it different - // from the one of tag2 to have deterministic column sort order to return - // deterministic sorted results .with_tag_column_with_full_stats( "tag1", Some("AL"), Some("MT"), 5, - Some(NonZeroU64::new(2).unwrap()), + Some(NonZeroU64::new(3).unwrap()), ) .with_tag_column_with_full_stats( "tag2", @@ -1226,15 +1225,12 @@ mod test { 5, Some(NonZeroU64::new(5).unwrap()), ) - // Actual distinct count of tag1 is 3 but make it 2 here to have it different - // from the one of tag2 to have deterministic column sort order to return - // deterministic sorted results .with_tag_column_with_full_stats( "tag1", Some("AL"), Some("MT"), 5, - Some(NonZeroU64::new(2).unwrap()), + Some(NonZeroU64::new(3).unwrap()), ) .with_tag_column_with_full_stats( "tag2", @@ -1446,15 +1442,12 @@ mod test { 5, Some(NonZeroU64::new(5).unwrap()), ) - // Actual distinct count of tag1 is 3 but make it 2 here to have it different - // from the one of tag2 and tag3 to have deterministic column sort order to return - // deterministic sorted results .with_tag_column_with_full_stats( "tag1", Some("AL"), Some("MT"), 5, - Some(NonZeroU64::new(2).unwrap()), + Some(NonZeroU64::new(3).unwrap()), ) .with_tag_column_with_full_stats( "tag2", @@ -1942,13 +1935,12 @@ mod test { 3, Some(NonZeroU64::new(3).unwrap()), ) - // Actual distinct count of tag1 is 3 but make it 2 to avoid the same stats with time to have a deterministic test results .with_tag_column_with_full_stats( "tag1", Some("UT"), Some("WA"), 3, - Some(NonZeroU64::new(2).unwrap()), + Some(NonZeroU64::new(3).unwrap()), ) .with_i64_field_column("field_int") .with_three_rows_of_data(), @@ -1964,13 +1956,12 @@ mod test { 4, Some(NonZeroU64::new(3).unwrap()), ) - // Actual distinct count of tag1 is 3 but make it 2 to avoid the same stats with time to have a deterministic test results .with_tag_column_with_full_stats( "tag1", Some("UT"), Some("WA"), 4, - Some(NonZeroU64::new(2).unwrap()), + Some(NonZeroU64::new(3).unwrap()), ) .with_i64_field_column("field_int") .with_may_contain_pk_duplicates(true) diff --git a/query/src/test.rs b/query/src/test.rs index 58c9b45fed..f60f03e58e 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -819,11 +819,8 @@ impl QueryChunk for TestChunk { None } - /// Sets sort key for the schema of this chunk - fn set_sort_key(&mut self, sort_key: &SortKey<'_>) { - let mut schema_cloned = self.schema.as_ref().clone(); - schema_cloned.set_sort_key(sort_key); - self.schema = Arc::new(schema_cloned); + fn chunk_type(&self) -> &str { + "Test Chunk" } fn apply_predicate_to_metadata(&self, predicate: &Predicate) -> Result { diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 10e095308f..997030cb08 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -441,7 +441,7 @@ impl QueryChunk for DbChunk { /// However, since we current sorted data based on their cardinality (see compute_sort_key), /// 2 different chunks may be sorted on different order of key columns. fn is_sorted_on_pk(&self) -> bool { - self.schema().sort_key().is_some() + self.schema().is_sorted_on_pk() } /// Returns the sort key of the chunk if any @@ -449,27 +449,12 @@ impl QueryChunk for DbChunk { self.meta.schema.sort_key() } - /// Sets sort key for the schema of this chunk - fn set_sort_key(&mut self, _sort_key: &SortKey<'_>) { - - // todo - // trace!(sort_key=?sort_key, "Input sort key to set_sort_key"); - - // // Update schema of the DBChunk - // let mut schema_cloned = self.meta.schema.as_ref().clone(); - // schema_cloned.set_sort_key(sort_key); - - // self.meta = Arc::new(ChunkMetadata { - // table_summary: Arc::new(self.meta.table_summary.as_ref()), - // schema: Arc::new(schema_cloned) - // }); - - // Update schema of the chunk itself - // match &self.state { - // State::MutableBuffer { chunk, .. } => {} - // State::ReadBuffer { chunk, .. } => {} - // State::ParquetFile { chunk, .. } => {} - // } + fn chunk_type(&self) -> &str { + match &self.state { + State::MutableBuffer { .. } => "MUB", + State::ReadBuffer { .. } => "RUB", + State::ParquetFile { .. } => "OS", + } } }