diff --git a/iox_query/src/provider/physical.rs b/iox_query/src/provider/physical.rs index b9c0c6f8ee..1a7bf3a2e2 100644 --- a/iox_query/src/provider/physical.rs +++ b/iox_query/src/provider/physical.rs @@ -20,7 +20,7 @@ use observability_deps::tracing::warn; use predicate::Predicate; use schema::{sort::SortKey, Schema}; use std::{ - collections::{hash_map::Entry, HashMap}, + collections::{hash_map::Entry, HashMap, HashSet}, sync::Arc, }; @@ -52,7 +52,7 @@ impl ParquetChunkList { meta: ObjectMeta, output_sort_key: Option<&SortKey>, ) -> Self { - let sort_key = combine_sort_key(output_sort_key.cloned(), chunk.sort_key()); + let sort_key = combine_sort_key(output_sort_key.cloned(), chunk.sort_key(), chunk.schema()); Self { object_store_url, @@ -66,7 +66,7 @@ impl ParquetChunkList { fn add_parquet_file(&mut self, chunk: &Arc, meta: ObjectMeta) { self.chunks.push((meta, Arc::clone(chunk))); - self.sort_key = combine_sort_key(self.sort_key.take(), chunk.sort_key()); + self.sort_key = combine_sort_key(self.sort_key.take(), chunk.sort_key(), chunk.schema()); } } @@ -79,10 +79,23 @@ impl ParquetChunkList { fn combine_sort_key( existing_sort_key: Option, chunk_sort_key: Option<&SortKey>, + chunk_schema: &Schema, ) -> Option { if let (Some(existing_sort_key), Some(chunk_sort_key)) = (existing_sort_key, chunk_sort_key) { let combined_sort_key = SortKey::try_merge_key(&existing_sort_key, chunk_sort_key); + if let Some(combined_sort_key) = combined_sort_key { + let chunk_cols = chunk_schema + .iter() + .map(|(_t, field)| field.name().as_str()) + .collect::>(); + for (col, _opts) in combined_sort_key.iter() { + if !chunk_sort_key.contains(col.as_ref()) && chunk_cols.contains(col.as_ref()) { + return None; + } + } + } + // Avoid cloning the sort key when possible, as the sort key // is likely to commonly be the same match combined_sort_key { @@ -249,7 +262,7 @@ where #[cfg(test)] mod tests { - use schema::sort::SortKeyBuilder; + use schema::{sort::SortKeyBuilder, SchemaBuilder, TIME_COLUMN_NAME}; use crate::{ test::{format_execution_plan, TestChunk}, @@ -271,48 +284,79 @@ mod tests { #[test] fn test_combine_sort_key() { + let schema_t1 = SchemaBuilder::new().tag("t1").timestamp().build().unwrap(); let skey_t1 = SortKeyBuilder::new() .with_col("t1") - .with_col("time") + .with_col(TIME_COLUMN_NAME) .build(); + let schema_t1_t2 = SchemaBuilder::new() + .tag("t1") + .tag("t2") + .timestamp() + .build() + .unwrap(); let skey_t1_t2 = SortKeyBuilder::new() .with_col("t1") .with_col("t2") - .with_col("time") + .with_col(TIME_COLUMN_NAME) .build(); let skey_t2_t1 = SortKeyBuilder::new() .with_col("t2") .with_col("t1") - .with_col("time") + .with_col(TIME_COLUMN_NAME) .build(); - assert_eq!(combine_sort_key(None, None), None); - assert_eq!(combine_sort_key(Some(skey_t1.clone()), None), None); - assert_eq!(combine_sort_key(None, Some(&skey_t1)), None); - + // output is None if any of the parameters is None (either no sort key requested or chunk is unsorted) + assert_eq!(combine_sort_key(None, None, &schema_t1), None); assert_eq!( - combine_sort_key(Some(skey_t1.clone()), Some(&skey_t1)), + combine_sort_key(Some(skey_t1.clone()), None, &schema_t1), + None + ); + assert_eq!(combine_sort_key(None, Some(&skey_t1), &schema_t1), None); + + // keeping sort key identical works + assert_eq!( + combine_sort_key(Some(skey_t1.clone()), Some(&skey_t1), &schema_t1), + Some(skey_t1.clone()) + ); + assert_eq!( + combine_sort_key(Some(skey_t1.clone()), Some(&skey_t1), &schema_t1_t2), Some(skey_t1.clone()) ); + // extending sort key works (chunk has more columns than existing key) assert_eq!( - combine_sort_key(Some(skey_t1.clone()), Some(&skey_t1_t2)), + combine_sort_key(Some(skey_t1.clone()), Some(&skey_t1_t2), &schema_t1_t2), Some(skey_t1_t2.clone()) ); + // extending sort key works (quorum has more columns than this chunk) assert_eq!( - combine_sort_key(Some(skey_t1_t2.clone()), Some(&skey_t1)), + combine_sort_key(Some(skey_t1_t2.clone()), Some(&skey_t1), &schema_t1), Some(skey_t1_t2.clone()) ); - assert_eq!( - combine_sort_key(Some(skey_t2_t1.clone()), Some(&skey_t1)), + combine_sort_key(Some(skey_t2_t1.clone()), Some(&skey_t1), &schema_t1), Some(skey_t2_t1.clone()) ); - assert_eq!(combine_sort_key(Some(skey_t2_t1), Some(&skey_t1_t2)), None); + // extending does not work if quorum covers columns that the chunk has but that are NOT sorted for that chunk + assert_eq!( + combine_sort_key(Some(skey_t1_t2.clone()), Some(&skey_t1), &schema_t1_t2), + None + ); + assert_eq!( + combine_sort_key(Some(skey_t2_t1.clone()), Some(&skey_t1), &schema_t1_t2), + None + ); + + // column order conflicts are detected + assert_eq!( + combine_sort_key(Some(skey_t2_t1), Some(&skey_t1_t2), &schema_t1_t2), + None + ); } #[test]