fix: check schema when calculating sorting for `ParquetExec` (#7136)

When combining sort keys, we have to check the schema of the chunk to
differentiate between "column does not exist within this chunk" and
"column exists but is not sorted".

This is unlikely an issue in prod at the moment (if there is not bug in
the ingester or compactor), but this was found while working on tests
for #6098. Overall this should improve robustness.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-03-07 10:20:31 +01:00 committed by GitHub
parent 075264057f
commit 91471fe568
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 61 additions and 17 deletions

View File

@ -20,7 +20,7 @@ use observability_deps::tracing::warn;
use predicate::Predicate;
use schema::{sort::SortKey, Schema};
use std::{
collections::{hash_map::Entry, HashMap},
collections::{hash_map::Entry, HashMap, HashSet},
sync::Arc,
};
@ -52,7 +52,7 @@ impl ParquetChunkList {
meta: ObjectMeta,
output_sort_key: Option<&SortKey>,
) -> Self {
let sort_key = combine_sort_key(output_sort_key.cloned(), chunk.sort_key());
let sort_key = combine_sort_key(output_sort_key.cloned(), chunk.sort_key(), chunk.schema());
Self {
object_store_url,
@ -66,7 +66,7 @@ impl ParquetChunkList {
fn add_parquet_file(&mut self, chunk: &Arc<dyn QueryChunk>, meta: ObjectMeta) {
self.chunks.push((meta, Arc::clone(chunk)));
self.sort_key = combine_sort_key(self.sort_key.take(), chunk.sort_key());
self.sort_key = combine_sort_key(self.sort_key.take(), chunk.sort_key(), chunk.schema());
}
}
@ -79,10 +79,23 @@ impl ParquetChunkList {
fn combine_sort_key(
existing_sort_key: Option<SortKey>,
chunk_sort_key: Option<&SortKey>,
chunk_schema: &Schema,
) -> Option<SortKey> {
if let (Some(existing_sort_key), Some(chunk_sort_key)) = (existing_sort_key, chunk_sort_key) {
let combined_sort_key = SortKey::try_merge_key(&existing_sort_key, chunk_sort_key);
if let Some(combined_sort_key) = combined_sort_key {
let chunk_cols = chunk_schema
.iter()
.map(|(_t, field)| field.name().as_str())
.collect::<HashSet<_>>();
for (col, _opts) in combined_sort_key.iter() {
if !chunk_sort_key.contains(col.as_ref()) && chunk_cols.contains(col.as_ref()) {
return None;
}
}
}
// Avoid cloning the sort key when possible, as the sort key
// is likely to commonly be the same
match combined_sort_key {
@ -249,7 +262,7 @@ where
#[cfg(test)]
mod tests {
use schema::sort::SortKeyBuilder;
use schema::{sort::SortKeyBuilder, SchemaBuilder, TIME_COLUMN_NAME};
use crate::{
test::{format_execution_plan, TestChunk},
@ -271,48 +284,79 @@ mod tests {
#[test]
fn test_combine_sort_key() {
let schema_t1 = SchemaBuilder::new().tag("t1").timestamp().build().unwrap();
let skey_t1 = SortKeyBuilder::new()
.with_col("t1")
.with_col("time")
.with_col(TIME_COLUMN_NAME)
.build();
let schema_t1_t2 = SchemaBuilder::new()
.tag("t1")
.tag("t2")
.timestamp()
.build()
.unwrap();
let skey_t1_t2 = SortKeyBuilder::new()
.with_col("t1")
.with_col("t2")
.with_col("time")
.with_col(TIME_COLUMN_NAME)
.build();
let skey_t2_t1 = SortKeyBuilder::new()
.with_col("t2")
.with_col("t1")
.with_col("time")
.with_col(TIME_COLUMN_NAME)
.build();
assert_eq!(combine_sort_key(None, None), None);
assert_eq!(combine_sort_key(Some(skey_t1.clone()), None), None);
assert_eq!(combine_sort_key(None, Some(&skey_t1)), None);
// output is None if any of the parameters is None (either no sort key requested or chunk is unsorted)
assert_eq!(combine_sort_key(None, None, &schema_t1), None);
assert_eq!(
combine_sort_key(Some(skey_t1.clone()), Some(&skey_t1)),
combine_sort_key(Some(skey_t1.clone()), None, &schema_t1),
None
);
assert_eq!(combine_sort_key(None, Some(&skey_t1), &schema_t1), None);
// keeping sort key identical works
assert_eq!(
combine_sort_key(Some(skey_t1.clone()), Some(&skey_t1), &schema_t1),
Some(skey_t1.clone())
);
assert_eq!(
combine_sort_key(Some(skey_t1.clone()), Some(&skey_t1), &schema_t1_t2),
Some(skey_t1.clone())
);
// extending sort key works (chunk has more columns than existing key)
assert_eq!(
combine_sort_key(Some(skey_t1.clone()), Some(&skey_t1_t2)),
combine_sort_key(Some(skey_t1.clone()), Some(&skey_t1_t2), &schema_t1_t2),
Some(skey_t1_t2.clone())
);
// extending sort key works (quorum has more columns than this chunk)
assert_eq!(
combine_sort_key(Some(skey_t1_t2.clone()), Some(&skey_t1)),
combine_sort_key(Some(skey_t1_t2.clone()), Some(&skey_t1), &schema_t1),
Some(skey_t1_t2.clone())
);
assert_eq!(
combine_sort_key(Some(skey_t2_t1.clone()), Some(&skey_t1)),
combine_sort_key(Some(skey_t2_t1.clone()), Some(&skey_t1), &schema_t1),
Some(skey_t2_t1.clone())
);
assert_eq!(combine_sort_key(Some(skey_t2_t1), Some(&skey_t1_t2)), None);
// extending does not work if quorum covers columns that the chunk has but that are NOT sorted for that chunk
assert_eq!(
combine_sort_key(Some(skey_t1_t2.clone()), Some(&skey_t1), &schema_t1_t2),
None
);
assert_eq!(
combine_sort_key(Some(skey_t2_t1.clone()), Some(&skey_t1), &schema_t1_t2),
None
);
// column order conflicts are detected
assert_eq!(
combine_sort_key(Some(skey_t2_t1), Some(&skey_t1_t2), &schema_t1_t2),
None
);
}
#[test]