chore: make terminology in iox_query::Provider consistent (remove super notation) (#5349)

* chore: make terminology in iox_query::Provider consistent (remove super notation)

* refactor: be more specific about *which* sort key is meant

* refactor: rename another sort_key --> output_sort_key

* refactor: rename additional sort_key to output_sort_key

* refactor: rename sort_key --> chunk_sort_key

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-08-10 06:59:47 -04:00 committed by GitHub
parent ee2013ce52
commit ce3e2c3a15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 66 additions and 58 deletions

View File

@ -90,7 +90,7 @@ pub struct ScanPlanBuilder<'a> {
table_schema: Arc<Schema>,
chunks: Vec<Arc<dyn QueryChunk>>,
/// The sort key that describes the desired output sort order
sort_key: Option<SortKey>,
output_sort_key: Option<SortKey>,
predicate: Option<&'a Predicate>,
}
@ -101,7 +101,7 @@ impl<'a> ScanPlanBuilder<'a> {
table_name: None,
table_schema,
chunks: vec![],
sort_key: None,
output_sort_key: None,
predicate: None,
}
}
@ -115,9 +115,9 @@ impl<'a> ScanPlanBuilder<'a> {
/// Sets the desired output sort key. If the output of this plan
/// is not already sorted this way, it will be re-sorted to conform
/// to this key
pub fn with_sort_key(mut self, sort_key: SortKey) -> Self {
assert!(self.sort_key.is_none());
self.sort_key = Some(sort_key);
pub fn with_output_sort_key(mut self, output_sort_key: SortKey) -> Self {
assert!(self.output_sort_key.is_none());
self.output_sort_key = Some(output_sort_key);
self
}
@ -134,7 +134,7 @@ impl<'a> ScanPlanBuilder<'a> {
ctx,
table_name,
chunks,
sort_key,
output_sort_key,
table_schema,
predicate,
} = self;
@ -151,9 +151,9 @@ impl<'a> ScanPlanBuilder<'a> {
// so no need to prune them
.add_no_op_pruner();
if let Some(sort_key) = sort_key {
if let Some(output_sort_key) = output_sort_key {
// Tell the scan of this provider to sort its output on the given sort_key
builder = builder.with_sort_key(sort_key);
builder = builder.with_output_sort_key(output_sort_key);
}
for chunk in chunks {

View File

@ -63,7 +63,7 @@ impl ReorgPlanner {
///
/// 1. Merges chunks together into a single stream
/// 2. Deduplicates via PK as necessary
/// 3. Sorts the result according to the requested key
/// 3. Sorts the result according to the requested `output_sort_key`
///
/// The plan looks like:
///
@ -73,14 +73,14 @@ impl ReorgPlanner {
&self,
schema: Arc<Schema>,
chunks: I,
sort_key: SortKey,
output_sort_key: SortKey,
) -> Result<LogicalPlan>
where
I: IntoIterator<Item = Arc<dyn QueryChunk>>,
{
let scan_plan = ScanPlanBuilder::new(schema, self.ctx.child_ctx("compact_plan"))
.with_chunks(chunks)
.with_sort_key(sort_key)
.with_output_sort_key(output_sort_key)
.build()
.context(BuildingScanSnafu)?;
@ -96,7 +96,7 @@ impl ReorgPlanner {
///
/// 1. Merges chunks together into a single stream
/// 2. Deduplicates via PK as necessary
/// 3. Sorts the result according to the requested key
/// 3. Sorts the result according to the requested output_sort_key
/// 4. Splits the stream on value of the `time` column: Those
/// rows that are on or before the time and those that are after
///
@ -146,7 +146,7 @@ impl ReorgPlanner {
&self,
schema: Arc<Schema>,
chunks: I,
sort_key: SortKey,
output_sort_key: SortKey,
split_times: Vec<i64>,
) -> Result<LogicalPlan>
where
@ -159,7 +159,7 @@ impl ReorgPlanner {
let scan_plan = ScanPlanBuilder::new(schema, self.ctx.child_ctx("split_plan"))
.with_chunks(chunks)
.with_sort_key(sort_key)
.with_output_sort_key(output_sort_key)
.build()
.context(BuildingScanSnafu)?;

View File

@ -45,7 +45,7 @@ pub trait QueryChunkMeta {
/// return a reference to the summary of the data held in this chunk
fn schema(&self) -> Arc<Schema>;
/// Return a reference to the chunk's partition sort key if any
/// Return a reference to the chunk's partition sort key if any.
/// Only persisted chunk has its partition sort key
fn partition_sort_key(&self) -> Option<&SortKey>;

View File

@ -120,7 +120,7 @@ pub struct ProviderBuilder {
schema: Arc<Schema>,
chunk_pruner: Option<Arc<dyn ChunkPruner>>,
chunks: Vec<Arc<dyn QueryChunk>>,
sort_key: Option<SortKey>,
output_sort_key: Option<SortKey>,
// execution context used for tracing
ctx: IOxSessionContext,
@ -133,15 +133,15 @@ impl ProviderBuilder {
schema,
chunk_pruner: None,
chunks: Vec::new(),
sort_key: None,
output_sort_key: None,
ctx,
}
}
/// Produce sorted output
pub fn with_sort_key(self, sort_key: SortKey) -> Self {
/// Produce sorted output specified by sort_key
pub fn with_output_sort_key(self, output_sort_key: SortKey) -> Self {
Self {
sort_key: Some(sort_key),
output_sort_key: Some(output_sort_key),
..self
}
}
@ -191,7 +191,7 @@ impl ProviderBuilder {
chunk_pruner,
table_name: self.table_name,
chunks: self.chunks,
sort_key: self.sort_key,
output_sort_key: self.output_sort_key,
ctx: self.ctx,
})
}
@ -210,8 +210,8 @@ pub struct ChunkTableProvider {
chunk_pruner: Arc<dyn ChunkPruner>,
/// The chunks
chunks: Vec<Arc<dyn QueryChunk>>,
/// The sort key if any
sort_key: Option<SortKey>,
/// The desired output sort key if any
output_sort_key: Option<SortKey>,
// execution context
ctx: IOxSessionContext,
@ -292,7 +292,7 @@ impl TableProvider for ChunkTableProvider {
scan_schema,
chunks,
predicate,
self.sort_key.clone(),
self.output_sort_key.clone(),
)?;
Ok(plan)
@ -639,14 +639,14 @@ impl Deduplicater {
Some(partition_sort_key)
}
// return true if the super_sort_key covers sort_key and same column order
fn sort_key_cover_and_same_order(super_sort_key: &SortKey, sort_key: &SortKey) -> bool {
if super_sort_key == sort_key {
// return true if `output_sort_key` covers `chunk_sort_key` and has the same column order
fn sort_key_cover_and_same_order(output_sort_key: &SortKey, chunk_sort_key: &SortKey) -> bool {
if output_sort_key == chunk_sort_key {
return true;
}
if let Some(merge_key) = SortKey::try_merge_key(super_sort_key, sort_key) {
if merge_key == super_sort_key {
if let Some(merge_key) = SortKey::try_merge_key(output_sort_key, chunk_sort_key) {
if merge_key == output_sort_key {
return true;
}
}
@ -751,7 +751,7 @@ impl Deduplicater {
output_schema: Arc<Schema>,
chunks: Vec<Arc<dyn QueryChunk>>, // These chunks are identified overlapped
predicate: Predicate,
sort_key: &SortKey,
output_sort_key: &SortKey,
) -> Result<Arc<dyn ExecutionPlan>> {
// Note that we may need to sort/deduplicate based on tag
// columns which do not appear in the output
@ -784,7 +784,7 @@ impl Deduplicater {
Arc::clone(&input_schema),
Arc::clone(chunk),
predicate.clone(),
Some(sort_key),
Some(output_sort_key),
)
})
.collect();
@ -795,7 +795,7 @@ impl Deduplicater {
let plan = UnionExec::new(sorted_chunk_plans?);
// Now (sort) merge the already sorted chunks
let sort_exprs = arrow_sort_key_exprs(sort_key, &plan.schema());
let sort_exprs = arrow_sort_key_exprs(output_sort_key, &plan.schema());
let plan = Arc::new(SortPreservingMergeExec::new(
sort_exprs.clone(),
@ -840,7 +840,7 @@ impl Deduplicater {
output_schema: Arc<Schema>,
chunk: Arc<dyn QueryChunk>, // This chunk is identified having duplicates
predicate: Predicate,
sort_key: &SortKey,
output_sort_key: &SortKey,
) -> Result<Arc<dyn ExecutionPlan>> {
let pk_schema = Self::compute_pk_schema(&[Arc::clone(&chunk)]);
let input_schema = Self::compute_input_schema(&output_schema, &pk_schema);
@ -862,12 +862,12 @@ impl Deduplicater {
Arc::clone(&input_schema),
Arc::clone(&chunks[0]),
predicate,
Some(sort_key),
Some(output_sort_key),
)?;
// Add DeduplicateExec
// Sort exprs for the deduplication
let sort_exprs = arrow_sort_key_exprs(sort_key, &plan.schema());
let sort_exprs = arrow_sort_key_exprs(output_sort_key, &plan.schema());
debug!(?sort_exprs, chunk_id=?chunks[0].id(), "Sort Expression for the deduplicate node of chunk");
let plan = Self::add_deduplicate_node(sort_exprs, plan);
@ -977,7 +977,7 @@ impl Deduplicater {
output_schema: Arc<Schema>,
chunk: Arc<dyn QueryChunk>,
predicate: Predicate, // This is the select predicate of the query
sort_key: Option<&SortKey>,
output_sort_key: Option<&SortKey>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Add columns of sort key and delete predicates in the schema of to-be-scanned IOxReadFilterNode
// This is needed because columns in select query may not include them yet
@ -1000,7 +1000,7 @@ impl Deduplicater {
trace!(?chunk_schema, "chunk schema");
// Cols of sort key
if let Some(key) = sort_key {
if let Some(key) = output_sort_key {
for (t, field) in chunk_schema.iter() {
// Ignore columns present in sort key but not in chunk
if key.get(field.name()).is_some() {
@ -1055,16 +1055,16 @@ impl Deduplicater {
}
// Add the sort operator, SortExec, if needed
if let Some(sort_key) = sort_key {
if let Some(output_sort_key) = output_sort_key {
let mut add_sort_op = true;
if let Some(chunk_sort_key) = chunk.sort_key() {
if Self::sort_key_cover_and_same_order(sort_key, chunk_sort_key) {
if Self::sort_key_cover_and_same_order(output_sort_key, chunk_sort_key) {
// the chunk is already sorted
add_sort_op = false;
}
}
if add_sort_op {
input = Self::build_sort_plan(chunk, input, sort_key)?
input = Self::build_sort_plan(chunk, input, output_sort_key)?
}
}
@ -1082,35 +1082,39 @@ impl Deduplicater {
) -> Result<Arc<dyn ExecutionPlan>> {
// output_sort_key cannot be empty
if output_sort_key.is_empty() {
panic!("Super sort key is empty");
panic!("Output sort key is empty");
}
debug!(output_sort_key=?output_sort_key, "Super sort key input to build_sort_plan");
debug!(chunk_id=?chunk.id(), ?output_sort_key, "input to build_sort_plan");
// Check to see if the plan is sorted on the subset of the output_sort_key
let sort_key = chunk.sort_key();
if let Some(chunk_sort_key) = sort_key {
let chunk_sort_key = chunk.sort_key();
if let Some(chunk_sort_key) = chunk_sort_key {
if let Some(merge_key) = SortKey::try_merge_key(output_sort_key, chunk_sort_key) {
if merge_key == output_sort_key {
// the chunk is already sorted on the subset of the o_sort_key,
// no need to resort it
debug!(ChunkID=?chunk.id(), "Chunk is sorted and no need the sort operator");
debug!(chunk_id=?chunk.id(), "Chunk is sorted and no need the sort operator");
return Ok(input);
}
} else {
// The chunk is sorted but not on different order with super sort key.
// Log it for investigating data set to improve performance further
// This behavior should not happen when ingester, compactor and querier use the sort key
// The chunk is sorted but in a different order than
// the requested output sort key.
// This is logged for additional investigation as the
// rest of IOx should not produce incompatible sort
// keys and it signals something is wrong with the
// ingester, compactor, or querier
warn!(chunk_type=?chunk.chunk_type(),
chunk_ID=?chunk.id(),
chunk_current_sort_order=?chunk_sort_key,
chunk_super_sort_key=?output_sort_key,
"Chunk will get resorted in build_sort_plan because it was sorted on different sort key");
chunk_id=?chunk.id(),
chunk_current_sort_key=?chunk_sort_key,
requested_output_sort_key=?output_sort_key,
"Chunk will be resorted in build_sort_plan because it was sorted on different sort key");
}
} else {
debug!(chunk_type=?chunk.chunk_type(),
chunk_ID=?chunk.id(),
"Chunk is not yet sorted and will get sorted in build_sort_plan");
chunk_id=?chunk.id(),
"Chunk is not yet sorted and will get sorted in build_sort_plan");
}
// Build arrow sort expression for the chunk sort key
@ -1133,7 +1137,7 @@ impl Deduplicater {
output_schema: Arc<Schema>,
chunk: Arc<dyn QueryChunk>, // This chunk is identified having no duplicates
predicate: Predicate,
sort_key: Option<&SortKey>,
output_sort_key: Option<&SortKey>,
) -> Result<Arc<dyn ExecutionPlan>> {
Self::build_sort_plan_for_read_filter(
ctx,
@ -1141,7 +1145,7 @@ impl Deduplicater {
output_schema,
chunk,
predicate,
sort_key,
output_sort_key,
)
}
@ -1785,7 +1789,7 @@ mod test {
let result = Deduplicater::sort_key_of_overlapped_chunks(&chunks).unwrap();
assert_eq!(*result, sort_key);
// partition sort key is a super key of the sort key
// partition sort key is a superset of the chunk sort key
let sort_key = SortKey::from_columns(vec!["tag1", TIME_COLUMN_NAME]);
let partition_sort_key = SortKey::from_columns(vec!["tag1", "tag2", TIME_COLUMN_NAME]);
let chunk1 = Arc::new(

View File

@ -515,7 +515,11 @@ pub fn adjust_sort_key_columns(
))
};
debug!(?primary_key, input_catalog_sort_key=?catalog_sort_key, output_chunk_sort_key=?metadata_sort_key, output_catalog_sort_key=?catalog_update, "Adjusted sort key");
debug!(?primary_key,
input_catalog_sort_key=?catalog_sort_key,
output_chunk_sort_key=?metadata_sort_key,
output_catalog_sort_key=?catalog_update,
"Adjusted sort key");
(metadata_sort_key, catalog_update)
}