refactor: address review comments and add useful log info to catch resort

pull/24376/head
Nga Tran 2021-07-15 15:39:12 -04:00
parent 3bb32594ba
commit cfe0bfa88b
6 changed files with 201 additions and 67 deletions

View File

@ -251,6 +251,23 @@ impl Schema {
self.inner = Arc::new(new_schema);
}
/// Returns true of the sort_key include all primary key cols
pub fn is_sorted_on_pk(&self) -> bool {
if let Some(sort_key) = self.sort_key() {
let key_columns = self.primary_key();
for key_col in key_columns {
if sort_key.get(key_col).is_none() {
return false; // pk col is not part of the sort key
}
}
true
} else {
// not sorted yet
false
}
}
/// Provide a reference to the underlying Arrow Schema object
pub fn inner(&self) -> &ArrowSchemaRef {
&self.inner
@ -1295,4 +1312,86 @@ mod test {
}
} if &column_name == "time" ));
}
#[test]
fn test_is_sort_on_pk() {
// Sort key the same as pk
let mut sort_key = SortKey::with_capacity(3);
sort_key.with_col("tag4");
sort_key.with_col("tag3");
sort_key.with_col("tag2");
sort_key.with_col("tag1");
sort_key.with_col(TIME_COLUMN_NAME);
let schema = SchemaBuilder::new()
.influx_field("the_field", String)
.tag("tag1")
.tag("tag2")
.tag("tag3")
.tag("tag4")
.timestamp()
.measurement("the_measurement")
.build_with_sort_key(&sort_key)
.unwrap();
assert!(schema.is_sorted_on_pk());
// Sort key does not include all pk cols
let mut sort_key = SortKey::with_capacity(3);
sort_key.with_col("tag3");
sort_key.with_col("tag1");
sort_key.with_col(TIME_COLUMN_NAME);
let schema = SchemaBuilder::new()
.influx_field("the_field", String)
.tag("tag1")
.tag("tag2")
.tag("tag3")
.tag("tag4")
.timestamp()
.measurement("the_measurement")
.build_with_sort_key(&sort_key)
.unwrap();
assert!(!schema.is_sorted_on_pk());
// No sort key
let schema = SchemaBuilder::new()
.influx_field("the_field", String)
.tag("tag1")
.tag("tag2")
.tag("tag3")
.tag("tag4")
.timestamp()
.measurement("the_measurement")
.build()
.unwrap();
assert!(!schema.is_sorted_on_pk());
// No PK, no sort key
let schema = SchemaBuilder::new()
.influx_field("the_field", String)
.measurement("the_measurement")
.build()
.unwrap();
assert!(!schema.is_sorted_on_pk());
// No PK, sort key on non pk
let mut sort_key = SortKey::with_capacity(3);
sort_key.with_col("the_field");
let schema = SchemaBuilder::new()
.influx_field("the_field", String)
.tag("tag1")
.tag("tag2")
.tag("tag3")
.tag("tag4")
.timestamp()
.measurement("the_measurement")
.build_with_sort_key(&sort_key)
.unwrap();
assert!(!schema.is_sorted_on_pk());
}
}

View File

@ -496,4 +496,66 @@ mod tests {
let merge_key = SortKey::try_merge_key(&key_abc_2, &key_bac_2);
assert_eq!(merge_key, None);
}
#[test]
fn test_selected_sort_key() {
let mut sort_key = SortKey::with_capacity(4);
sort_key.with_col("a"); // default sort option
sort_key.with_col_opts("b", true, false);
sort_key.with_col_opts("c", false, false);
sort_key.with_col(TIME_COLUMN_NAME);
// input cols is empty -> nothing selected
let cols = vec![];
let selected_key = sort_key.selected_sort_key(cols);
assert!(selected_key.is_empty());
// input cols is not part of the key -> nothing selected
let cols = vec!["d", "e"];
let selected_key = sort_key.selected_sort_key(cols);
assert!(selected_key.is_empty());
// input cols exactly the same and in the same order -> exact sort_key selected
let cols = vec!["a", "b", "c", TIME_COLUMN_NAME];
let selected_key = sort_key.selected_sort_key(cols);
assert_eq!(selected_key, sort_key);
// input cols exactly the same but in different order -> exact sort_key selected
let cols = vec!["c", TIME_COLUMN_NAME, "b", "a"];
let selected_key = sort_key.selected_sort_key(cols);
assert_eq!(selected_key, sort_key);
// input cols is subset but in the same order -> subset selected
let cols = vec!["a", "b"];
let selected_key = sort_key.selected_sort_key(cols);
let mut expected_key = SortKey::with_capacity(2);
expected_key.with_col("a"); // default sort option
expected_key.with_col_opts("b", true, false);
assert_eq!(selected_key, expected_key);
// input cols is subset but in the same order -> subset selected
let cols = vec![TIME_COLUMN_NAME];
let selected_key = sort_key.selected_sort_key(cols);
let mut expected_key = SortKey::with_capacity(1);
expected_key.with_col(TIME_COLUMN_NAME);
assert_eq!(selected_key, expected_key);
// input cols is subset but in the same order with gap -> subset selected
let cols = vec!["a", "c", TIME_COLUMN_NAME];
let selected_key = sort_key.selected_sort_key(cols);
let mut expected_key = SortKey::with_capacity(3);
expected_key.with_col("a"); // default sort option
expected_key.with_col_opts("c", false, false);
expected_key.with_col(TIME_COLUMN_NAME);
assert_eq!(selected_key, expected_key);
// input cols is subset but in different order -> subset in the order with sort_key selected
let cols = vec![TIME_COLUMN_NAME, "b", "c"];
let selected_key = sort_key.selected_sort_key(cols);
let mut expected_key = SortKey::with_capacity(3);
expected_key.with_col_opts("b", true, false);
expected_key.with_col_opts("c", false, false);
expected_key.with_col(TIME_COLUMN_NAME);
assert_eq!(selected_key, expected_key);
}
}

View File

@ -147,8 +147,8 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync {
/// Returns the sort key of the chunk if any
fn sort_key(&self) -> Option<SortKey<'_>>;
/// Sets sort key for the schema of this chunk
fn set_sort_key(&mut self, sort_key: &SortKey<'_>);
/// Returns chunk type which is either MUB, RUB, OS
fn chunk_type(&self) -> &str;
}
#[async_trait]

View File

@ -20,7 +20,7 @@ use datafusion::{
},
};
use internal_types::schema::{merge::SchemaMerger, sort::SortKey, Schema};
use observability_deps::tracing::{debug, trace};
use observability_deps::tracing::{debug, info, trace};
use crate::{
compute_sort_key,
@ -688,7 +688,19 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
trace!(ChunkID=?chunk.id(), "Chunk is sorted and no need the sort operator");
return Ok(input);
}
} else {
// The chunk is sorted but not on different order with super sort key.
// Log it for investigating data set to improve performance further
info!(chunk_type=?chunk.chunk_type(),
chunk_ID=?chunk.id(),
chunk_current_sort_order=?chunk_sort_key,
chunk_super_sort_key=?super_sort_key,
"Chunk will get resorted in build_sort_plan due to new cardinality rate between key columns");
}
} else {
info!(chunk_type=?chunk.chunk_type(),
chunk_ID=?chunk.id(),
"Chunk is not yet sorted and will get sorted in build_sort_plan");
}
// Build the chunk's sort key that is a subset of the super_sort_key
@ -696,10 +708,15 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
// First get the chunk pk columns
let schema = chunk.schema();
let key_columns = schema.primary_key();
trace!(pk_columns=?key_columns, "PK columns of the chunk that have not been sorted yet");
// Now get the key subset of the super key that includes the chunk's pk columns
let chunk_sort_key = super_sort_key.selected_sort_key(key_columns);
let chunk_sort_key = super_sort_key.selected_sort_key(key_columns.clone());
info!(chunk_type=?chunk.chunk_type(),
chunk_ID=?chunk.id(),
pk_columns=?key_columns,
sort_key=?chunk_sort_key,
"Chunk is getting sorted");
// Build arrow sort expression for the chunk sort key
let input_schema = input.schema();
@ -707,9 +724,6 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
trace!(Sort_Exprs=?sort_exprs, Chunk_ID=?chunk.id(), "Sort Expression for the sort operator of chunk");
// The chunk must be sorted after this, set sort key for it
// Todo: chunk.set_sort_key(&chunk_sort_key);
// Create SortExec operator
Ok(Arc::new(
SortExec::try_new(sort_exprs, input).context(InternalSort)?,
@ -934,15 +948,12 @@ mod test {
5,
Some(NonZeroU64::new(5).unwrap()),
)
// Actual distinct count of tag1 is 3 but make it 2 here to have it different
// from the one of tag2 to have deterministic column sort order to return
// deterministic sorted results
.with_tag_column_with_full_stats(
"tag1",
Some("AL"),
Some("MT"),
5,
Some(NonZeroU64::new(2).unwrap()),
Some(NonZeroU64::new(3).unwrap()),
)
.with_tag_column_with_full_stats(
"tag2",
@ -1017,15 +1028,12 @@ mod test {
5,
Some(NonZeroU64::new(5).unwrap()),
)
// Actual distinct count of tag1 is 3 but make it 2 here to have it different
// from the one of tag2 to have deterministic column sort order to return
// deterministic sorted results
.with_tag_column_with_full_stats(
"tag1",
Some("AL"),
Some("MT"),
5,
Some(NonZeroU64::new(2).unwrap()),
Some(NonZeroU64::new(3).unwrap()),
)
.with_tag_column_with_full_stats(
"tag2",
@ -1083,15 +1091,12 @@ mod test {
5,
Some(NonZeroU64::new(5).unwrap()),
)
// Actual distinct count of tag1 is 3 but make it 2 here to have it different
// from the one of tag2 to have deterministic column sort order to return
// deterministic sorted results
.with_tag_column_with_full_stats(
"tag1",
Some("AL"),
Some("MT"),
5,
Some(NonZeroU64::new(2).unwrap()),
Some(NonZeroU64::new(3).unwrap()),
)
.with_tag_column_with_full_stats(
"tag2",
@ -1114,15 +1119,12 @@ mod test {
5,
Some(NonZeroU64::new(5).unwrap()),
)
// Actual distinct count of tag1 is 3 but make it 2 here to have it different
// from the one of tag2 to have deterministic column sort order to return
// deterministic sorted results
.with_tag_column_with_full_stats(
"tag1",
Some("AL"),
Some("MT"),
5,
Some(NonZeroU64::new(2).unwrap()),
Some(NonZeroU64::new(3).unwrap()),
)
.with_tag_column_with_full_stats(
"tag2",
@ -1195,15 +1197,12 @@ mod test {
5,
Some(NonZeroU64::new(5).unwrap()),
)
// Actual distinct count of tag1 is 3 but make it 2 here to have it different
// from the one of tag2 to have deterministic column sort order to return
// deterministic sorted results
.with_tag_column_with_full_stats(
"tag1",
Some("AL"),
Some("MT"),
5,
Some(NonZeroU64::new(2).unwrap()),
Some(NonZeroU64::new(3).unwrap()),
)
.with_tag_column_with_full_stats(
"tag2",
@ -1226,15 +1225,12 @@ mod test {
5,
Some(NonZeroU64::new(5).unwrap()),
)
// Actual distinct count of tag1 is 3 but make it 2 here to have it different
// from the one of tag2 to have deterministic column sort order to return
// deterministic sorted results
.with_tag_column_with_full_stats(
"tag1",
Some("AL"),
Some("MT"),
5,
Some(NonZeroU64::new(2).unwrap()),
Some(NonZeroU64::new(3).unwrap()),
)
.with_tag_column_with_full_stats(
"tag2",
@ -1446,15 +1442,12 @@ mod test {
5,
Some(NonZeroU64::new(5).unwrap()),
)
// Actual distinct count of tag1 is 3 but make it 2 here to have it different
// from the one of tag2 and tag3 to have deterministic column sort order to return
// deterministic sorted results
.with_tag_column_with_full_stats(
"tag1",
Some("AL"),
Some("MT"),
5,
Some(NonZeroU64::new(2).unwrap()),
Some(NonZeroU64::new(3).unwrap()),
)
.with_tag_column_with_full_stats(
"tag2",
@ -1942,13 +1935,12 @@ mod test {
3,
Some(NonZeroU64::new(3).unwrap()),
)
// Actual distinct count of tag1 is 3 but make it 2 to avoid the same stats with time to have a deterministic test results
.with_tag_column_with_full_stats(
"tag1",
Some("UT"),
Some("WA"),
3,
Some(NonZeroU64::new(2).unwrap()),
Some(NonZeroU64::new(3).unwrap()),
)
.with_i64_field_column("field_int")
.with_three_rows_of_data(),
@ -1964,13 +1956,12 @@ mod test {
4,
Some(NonZeroU64::new(3).unwrap()),
)
// Actual distinct count of tag1 is 3 but make it 2 to avoid the same stats with time to have a deterministic test results
.with_tag_column_with_full_stats(
"tag1",
Some("UT"),
Some("WA"),
4,
Some(NonZeroU64::new(2).unwrap()),
Some(NonZeroU64::new(3).unwrap()),
)
.with_i64_field_column("field_int")
.with_may_contain_pk_duplicates(true)

View File

@ -819,11 +819,8 @@ impl QueryChunk for TestChunk {
None
}
/// Sets sort key for the schema of this chunk
fn set_sort_key(&mut self, sort_key: &SortKey<'_>) {
let mut schema_cloned = self.schema.as_ref().clone();
schema_cloned.set_sort_key(sort_key);
self.schema = Arc::new(schema_cloned);
fn chunk_type(&self) -> &str {
"Test Chunk"
}
fn apply_predicate_to_metadata(&self, predicate: &Predicate) -> Result<PredicateMatch> {

View File

@ -441,7 +441,7 @@ impl QueryChunk for DbChunk {
/// However, since we current sorted data based on their cardinality (see compute_sort_key),
/// 2 different chunks may be sorted on different order of key columns.
fn is_sorted_on_pk(&self) -> bool {
self.schema().sort_key().is_some()
self.schema().is_sorted_on_pk()
}
/// Returns the sort key of the chunk if any
@ -449,27 +449,12 @@ impl QueryChunk for DbChunk {
self.meta.schema.sort_key()
}
/// Sets sort key for the schema of this chunk
fn set_sort_key(&mut self, _sort_key: &SortKey<'_>) {
// todo
// trace!(sort_key=?sort_key, "Input sort key to set_sort_key");
// // Update schema of the DBChunk
// let mut schema_cloned = self.meta.schema.as_ref().clone();
// schema_cloned.set_sort_key(sort_key);
// self.meta = Arc::new(ChunkMetadata {
// table_summary: Arc::new(self.meta.table_summary.as_ref()),
// schema: Arc::new(schema_cloned)
// });
// Update schema of the chunk itself
// match &self.state {
// State::MutableBuffer { chunk, .. } => {}
// State::ReadBuffer { chunk, .. } => {}
// State::ParquetFile { chunk, .. } => {}
// }
fn chunk_type(&self) -> &str {
match &self.state {
State::MutableBuffer { .. } => "MUB",
State::ReadBuffer { .. } => "RUB",
State::ParquetFile { .. } => "OS",
}
}
}