refactor: address review comments

pull/24376/head
Nga Tran 2021-07-29 13:38:42 -04:00
parent 0d05ac3961
commit e8828c22e4
3 changed files with 62 additions and 68 deletions

View File

@ -187,14 +187,14 @@ impl ReorgPlanner {
Ok((schema, plan))
}
/// Creates a scan plan for the set of chunks that:
/// Creates a scan plan for the given set of chunks.
/// Output data of the scan will be deduplicated and sorted
/// on the optimal sort order of the chunks' PK columns (tags and time).
/// The optimal sort order is computed based on the PK columns cardinality
/// that will be best for RLE encoding.
///
/// 1. Merges chunks together into a single stream
/// 2. Deduplicates via PK as necessary
/// 3. Sorts the result according to the requested key
///
/// The plan will look like the sorted scan plan specified in
/// query::provider::build_scan_plan
/// Prefer to query::provider::build_scan_plan for the detail of the plan
///
fn sorted_scan_plan<C, I>(&self, schema: Arc<Schema>, chunks: I) -> Result<ScanPlan<C>>
where
C: QueryChunk + 'static,
@ -210,8 +210,8 @@ impl ReorgPlanner {
// Prepare the plan for the table
let mut builder = ProviderBuilder::new(table_name, schema);
// Tell the scan of this provider to sort its output
builder.sort_output();
// Tell the scan of this provider to sort its output on the chunks' PK
builder.ensure_pk_sort();
// There are no predicates in these plans, so no need to prune them
builder = builder.add_no_op_pruner();

View File

@ -100,8 +100,8 @@ pub struct ProviderBuilder<C: QueryChunk + 'static> {
schema: Arc<Schema>,
chunk_pruner: Option<Arc<dyn ChunkPruner<C>>>,
chunks: Vec<Arc<C>>,
/// have the scan output sorted on PK
sort_output: bool,
/// ensure the output is sorted on the pk columns (in an optimal order computed based on their cardinality)
ensure_pk_sort: bool,
}
impl<C: QueryChunk> ProviderBuilder<C> {
@ -111,13 +111,13 @@ impl<C: QueryChunk> ProviderBuilder<C> {
schema,
chunk_pruner: None,
chunks: Vec::new(),
sort_output: false, // never sort the output unless explicitly specified
ensure_pk_sort: false, // never sort the output unless explicitly specified
}
}
/// Requests the output of the scan sorted
pub fn sort_output(&mut self) {
self.sort_output = true;
pub fn ensure_pk_sort(&mut self) {
self.ensure_pk_sort = true;
}
/// Add a new chunk to this provider
@ -165,7 +165,7 @@ impl<C: QueryChunk> ProviderBuilder<C> {
chunk_pruner,
table_name: self.table_name,
chunks: self.chunks,
sort_output: self.sort_output,
ensure_pk_sort: self.ensure_pk_sort,
})
}
}
@ -183,8 +183,8 @@ pub struct ChunkTableProvider<C: QueryChunk + 'static> {
chunk_pruner: Arc<dyn ChunkPruner<C>>,
// The chunks
chunks: Vec<Arc<C>>,
/// have the scan output sorted ok PK
sort_output: bool,
/// ensure the output is sorted on the pk columns (in an optimal order computed based on their cardinality)
ensure_pk_sort: bool,
}
impl<C: QueryChunk + 'static> ChunkTableProvider<C> {
@ -204,8 +204,8 @@ impl<C: QueryChunk + 'static> ChunkTableProvider<C> {
}
/// Requests the output of the scan sorted
pub fn sort_output(&mut self) {
self.sort_output = true;
pub fn ensure_pk_sort(&mut self) {
self.ensure_pk_sort = true;
}
}
@ -261,7 +261,7 @@ impl<C: QueryChunk + 'static> TableProvider for ChunkTableProvider<C> {
scan_schema,
chunks,
predicate,
self.sort_output,
self.ensure_pk_sort,
)?;
Ok(plan)
@ -478,7 +478,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
// we still need to add this SortPreservingMergeExec because:
// 1. It will provide a sorted signal(through Datafusion's Distribution::UnspecifiedDistribution)
// 2. And it will not do anything extra if the input is one partition so won't affect performance
let sort_exprs = arrow_sort_key_exprs(output_sort_key, &plan.schema());
let sort_exprs = arrow_sort_key_exprs(&output_sort_key, &plan.schema());
plan = Arc::new(SortPreservingMergeExec::new(sort_exprs, plan, BATCH_SIZE));
}
@ -557,7 +557,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
output_schema: Arc<Schema>,
chunks: Vec<Arc<C>>, // These chunks are identified overlapped
predicate: Predicate,
super_sort_key: &SortKey<'_>,
output_sort_key: &SortKey<'_>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Note that we may need to sort/deduplicate based on tag
// columns which do not appear in the output
@ -565,19 +565,13 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
let pk_schema = Self::compute_pk_schema(&chunks);
let input_schema = Self::compute_input_schema(&output_schema, &pk_schema);
// Compute the output sort key which is the super key of chunks' keys based on their data cardinality
let chunks_sort_key = compute_sort_key(chunks.iter().map(|x| x.summary()));
// get super key of these chunks with the input one
let mut output_sort_key = chunks_sort_key;
if !super_sort_key.is_empty() {
if let Some(sort_key) = SortKey::try_merge_key(super_sort_key, &output_sort_key) {
output_sort_key = sort_key;
} else {
// Not found the same sort order in the super key, must use the order of super key for the sort
output_sort_key = super_sort_key.to_owned();
}
}
trace!(output_sort_key=?output_sort_key, "Computed the sort key for the input chunks");
// Compute the output sort key for these chunks
let sort_key = if !output_sort_key.is_empty() {
output_sort_key.to_owned()
} else {
compute_sort_key(chunks.iter().map(|x| x.summary()))
};
trace!(sort_key=?sort_key, "sort key for the input chunks");
trace!(
?output_schema,
@ -595,7 +589,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
Arc::clone(&input_schema),
Arc::clone(&chunk),
predicate.clone(),
&output_sort_key,
&sort_key,
)
})
.collect();
@ -606,7 +600,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
let plan = UnionExec::new(sorted_chunk_plans?);
// Now (sort) merge the already sorted chunks
let sort_exprs = arrow_sort_key_exprs(output_sort_key, &plan.schema());
let sort_exprs = arrow_sort_key_exprs(&sort_key, &plan.schema());
let plan = Arc::new(SortPreservingMergeExec::new(
sort_exprs.clone(),
@ -651,18 +645,18 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
output_schema: Arc<Schema>,
chunk: Arc<C>, // This chunk is identified having duplicates
predicate: Predicate,
super_sort_key: &SortKey<'_>,
output_sort_key: &SortKey<'_>,
) -> Result<Arc<dyn ExecutionPlan>> {
let pk_schema = Self::compute_pk_schema(&[Arc::clone(&chunk)]);
let input_schema = Self::compute_input_schema(&output_schema, &pk_schema);
// Compute the output sort key for this chunk
let mut output_sort_key = if !super_sort_key.is_empty() {
super_sort_key.to_owned()
let mut sort_key = if !output_sort_key.is_empty() {
output_sort_key.to_owned()
} else {
compute_sort_key(vec![chunk.summary()].into_iter())
};
trace!(output_sort_key=?output_sort_key,chunk_id=?chunk.id(), "Computed the sort key for the input chunk");
trace!(sort_key=?sort_key,chunk_id=?chunk.id(), "Computed the sort key for the input chunk");
// Create the 2 bottom nodes IOxReadFilterNode and SortExec
let plan = Self::build_sort_plan_for_read_filter(
@ -670,22 +664,22 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
Arc::clone(&input_schema),
Arc::clone(&chunk),
predicate,
&output_sort_key,
&sort_key,
)?;
// The sort key of this chunk might only the subset of the super sort key
if !super_sort_key.is_empty() {
if !output_sort_key.is_empty() {
// First get the chunk pk columns
let schema = chunk.schema();
let key_columns = schema.primary_key();
// Now get the key subset of the super key that includes the chunk's pk columns
output_sort_key = super_sort_key.selected_sort_key(key_columns.clone());
sort_key = output_sort_key.selected_sort_key(key_columns.clone());
}
// Add DeduplicateExc
// Sort exprs for the deduplication
let sort_exprs = arrow_sort_key_exprs(output_sort_key, &plan.schema());
let sort_exprs = arrow_sort_key_exprs(&sort_key, &plan.schema());
trace!(Sort_Exprs=?sort_exprs, chunk_ID=?chunk.id(), "Sort Expression for the deduplicate node of chunk");
let plan = Self::add_deduplicate_node(sort_exprs, plan);
@ -759,7 +753,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
output_schema: Arc<Schema>,
chunk: Arc<C>, // This chunk is identified having duplicates
predicate: Predicate,
super_sort_key: &SortKey<'_>,
output_sort_key: &SortKey<'_>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Create the bottom node IOxReadFilterNode for this chunk
let input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new(
@ -770,8 +764,8 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
));
// Add the sort operator, SortExec, if needed
if !super_sort_key.is_empty() {
Self::build_sort_plan(chunk, input, super_sort_key)
if !output_sort_key.is_empty() {
Self::build_sort_plan(chunk, input, output_sort_key)
} else {
Ok(input)
}
@ -782,21 +776,21 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
fn build_sort_plan(
chunk: Arc<C>,
input: Arc<dyn ExecutionPlan>,
super_sort_key: &SortKey<'_>,
output_sort_key: &SortKey<'_>,
) -> Result<Arc<dyn ExecutionPlan>> {
// super_sort_key cannot be empty
if super_sort_key.is_empty() {
// output_sort_key cannot be empty
if output_sort_key.is_empty() {
panic!("Super sort key is empty");
}
trace!(super_sort_key=?super_sort_key, "Super sort key input to build_sort_plan");
trace!(output_sort_key=?output_sort_key, "Super sort key input to build_sort_plan");
// Check to see if the plan is sorted on the subset of the super_sort_key
// Check to see if the plan is sorted on the subset of the output_sort_key
let sort_key = chunk.sort_key();
if let Some(chunk_sort_key) = sort_key {
if let Some(merge_key) = SortKey::try_merge_key(super_sort_key, &chunk_sort_key) {
if merge_key == *super_sort_key {
// the chunk is already sorted on the subset of the super_sort_key,
if let Some(merge_key) = SortKey::try_merge_key(output_sort_key, &chunk_sort_key) {
if merge_key == *output_sort_key {
// the chunk is already sorted on the subset of the o_sort_key,
// no need to resort it
trace!(ChunkID=?chunk.id(), "Chunk is sorted and no need the sort operator");
return Ok(input);
@ -807,7 +801,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
info!(chunk_type=?chunk.chunk_type(),
chunk_ID=?chunk.id(),
chunk_current_sort_order=?chunk_sort_key,
chunk_super_sort_key=?super_sort_key,
chunk_super_sort_key=?output_sort_key,
"Chunk will get resorted in build_sort_plan due to new cardinality rate between key columns");
}
} else {
@ -816,14 +810,14 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
"Chunk is not yet sorted and will get sorted in build_sort_plan");
}
// Build the chunk's sort key that is a subset of the super_sort_key
// Build the chunk's sort key that is a subset of the output_sort_key
//
// First get the chunk pk columns
let schema = chunk.schema();
let key_columns = schema.primary_key();
// Now get the key subset of the super key that includes the chunk's pk columns
let chunk_sort_key = super_sort_key.selected_sort_key(key_columns.clone());
let chunk_sort_key = output_sort_key.selected_sort_key(key_columns.clone());
info!(chunk_type=?chunk.chunk_type(),
chunk_ID=?chunk.id(),
@ -833,7 +827,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
// Build arrow sort expression for the chunk sort key
let input_schema = input.schema();
let sort_exprs = arrow_sort_key_exprs(chunk_sort_key, &input_schema);
let sort_exprs = arrow_sort_key_exprs(&chunk_sort_key, &input_schema);
trace!(Sort_Exprs=?sort_exprs, Chunk_ID=?chunk.id(), "Sort Expression for the sort operator of chunk");
@ -847,7 +841,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
/// ```text
/// ┌─────────────────┐
/// │ SortExec │
/// │ (optional) │ <-- Only added if the input super_sort_key is not empty
/// │ (optional) │ <-- Only added if the input output_sort_key is not empty
/// └─────────────────┘
/// ▲
/// │
@ -862,20 +856,20 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
output_schema: Arc<Schema>,
chunk: Arc<C>, // This chunk is identified having no duplicates
predicate: Predicate,
super_sort_key: &SortKey<'_>,
output_sort_key: &SortKey<'_>,
) -> Result<Arc<dyn ExecutionPlan>> {
Self::build_sort_plan_for_read_filter(
table_name,
output_schema,
chunk,
predicate,
super_sort_key,
output_sort_key,
)
}
/// Return either:
/// the simplest IOx scan plan for many chunks which is IOxReadFilterNode
/// if the input super_sor_key is empty
/// if the input output_sort_key is empty
/// ```text
/// ┌─────────────────┐
/// │IOxReadFilterNode│
@ -903,12 +897,12 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
output_schema: Arc<Schema>,
chunks: Vec<Arc<C>>, // These chunks is identified having no duplicates
predicate: Predicate,
super_sort_key: &SortKey<'_>,
output_sort_key: &SortKey<'_>,
) -> Result<Vec<Arc<dyn ExecutionPlan>>> {
let mut plans: Vec<Arc<dyn ExecutionPlan>> = vec![];
// output is not required to be sorted or no chunks provided, only create a read filter for all chunks
if super_sort_key.is_empty() || chunks.is_empty() {
if output_sort_key.is_empty() || chunks.is_empty() {
plans.push(Arc::new(IOxReadFilterNode::new(
Arc::clone(&table_name),
output_schema.as_arrow(),
@ -928,7 +922,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
Arc::clone(&output_schema),
Arc::clone(&chunk),
predicate.clone(),
super_sort_key,
output_sort_key,
)
})
.collect();

View File

@ -52,7 +52,7 @@ pub fn arrow_pk_sort_exprs(
}
pub fn arrow_sort_key_exprs(
sort_key: SortKey<'_>,
sort_key: &SortKey<'_>,
input_schema: &ArrowSchema,
) -> Vec<PhysicalSortExpr> {
let mut sort_exprs = vec![];