Merge branch 'main' into logsdebug

pull/24376/head
kodiakhq[bot] 2021-06-07 10:22:26 +00:00 committed by GitHub
commit 09c98cc5bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 282 additions and 3 deletions

View File

@ -243,10 +243,15 @@ impl<C: PartitionChunk + 'static> TableProvider for ChunkTableProvider<C> {
// Figure out the schema of the requested output
let scan_schema = project_schema(self.arrow_schema(), projection);
let plan =
IOxReadFilterNode::new(Arc::clone(&self.table_name), scan_schema, chunks, predicate);
let mut deduplicate = Deduplicater::new();
let plan = deduplicate.build_scan_plan(
Arc::clone(&self.table_name),
scan_schema,
chunks,
predicate,
);
Ok(Arc::new(plan))
Ok(plan)
}
fn statistics(&self) -> Statistics {
@ -262,6 +267,280 @@ impl<C: PartitionChunk + 'static> TableProvider for ChunkTableProvider<C> {
}
}
#[derive(Clone, Debug, Default)]
/// A deduplicater that deduplicate the duplicated data during scan execution
pub(crate) struct Deduplicater<C: PartitionChunk + 'static> {
// a vector of a vector of overlapped chunks
pub overlapped_chunks_set: Vec<Vec<Arc<C>>>,
// a vector of non-overlapped chunks each have duplicates in itself
pub in_chunk_duplicates_chunks: Vec<Arc<C>>,
// a vector of non-overlapped and non-duplicates chunks
pub no_duplicates_chunks: Vec<Arc<C>>,
}
impl<C: PartitionChunk + 'static> Deduplicater<C> {
fn new() -> Self {
Self {
overlapped_chunks_set: vec![],
in_chunk_duplicates_chunks: vec![],
no_duplicates_chunks: vec![],
}
}
/// The IOx scan process needs to deduplicate data if there are duplicates. Hence it will look
/// like this. In this example, there are 4 chunks.
/// . Chunks 1 and 2 overlap and need to get deduplicated. This includes these main steps:
/// i. Read/scan/steam the chunk: IOxReadFilterNode.
/// ii. Sort each chunk if they are not sorted yet: SortExec.
/// iii. Merge the sorted chunks into one stream: SortPreservingMergeExc.
/// iv. Deduplicate the sorted stream: DeduplicateExec
/// . Chunk 3 does not overlap with others but has duplicates in it self, hence it only needs to get
/// sorted if needed, then deduplicated.
/// . Chunk 4 neither overlaps with other chunks nor has duplicates in itself, hence it does not
/// need any extra besides chunk reading.
/// The final UnionExec on top is to union the streams below. If there is only one stream, UnionExec
/// will not be added into the plan.
/// ```text
/// ┌─────────────────┐
/// │ UnionExec │
/// │ │
/// └─────────────────┘
/// ▲
/// │
/// ┌──────────────────────┴───────────┬─────────────────────┐
/// │ │ │
/// │ │ │
/// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
/// │ DeduplicateExec │ │ DeduplicateExec │ │IOxReadFilterNode│
/// └─────────────────┘ └─────────────────┘ │ (Chunk 4) │
/// ▲ ▲ └─────────────────┘
/// │ │
/// ┌───────────────────────┐ │
/// │SortPreservingMergeExec│ │
/// └───────────────────────┘ │
/// ▲ |
/// │ |
/// ┌───────────┴───────────┐ │
/// │ │ │
/// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
/// │ SortExec │ │ SortExec │ │ SortExec │
/// │ (optional) │ │ (optional) │ │ (optional) │
/// └─────────────────┘ └─────────────────┘ └─────────────────┘
/// ▲ ▲ ▲
/// │ │ │
/// │ │ │
/// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
/// │IOxReadFilterNode│ │IOxReadFilterNode│ │IOxReadFilterNode│
/// │ (Chunk 1) │ │ (Chunk 2) │ │ (Chunk 3) │
/// └─────────────────┘ └─────────────────┘ └─────────────────┘
///```
fn build_scan_plan(
&mut self,
table_name: Arc<str>,
schema: ArrowSchemaRef,
chunks: Vec<Arc<C>>,
predicate: Predicate,
) -> Arc<dyn ExecutionPlan> {
//finding overlapped chunks and put them into the right group
self.split_overlapped_chunks(chunks.to_vec());
// Building plans
let mut plans = vec![];
if self.no_duplicates() {
// Neither overlaps nor duplicates, no deduplicating needed
let plan = Self::build_plans_for_non_duplicates_chunk(
Arc::clone(&table_name),
Arc::clone(&schema),
chunks,
predicate,
);
plans.push(plan);
} else {
// Go over overlapped set, build deduplicate plan for each vector of overlapped chunks
for overlapped_chunks in self.overlapped_chunks_set.to_vec() {
plans.push(Self::build_deduplicate_plan_for_overlapped_chunks(
Arc::clone(&table_name),
Arc::clone(&schema),
overlapped_chunks.to_owned(),
predicate.clone(),
));
}
// Go over each in_chunk_duplicates_chunks, build deduplicate plan for each
for chunk_with_duplicates in self.in_chunk_duplicates_chunks.to_vec() {
plans.push(Self::build_deduplicate_plan_for_chunk_with_duplicates(
Arc::clone(&table_name),
Arc::clone(&schema),
chunk_with_duplicates.to_owned(),
predicate.clone(),
));
}
// Go over non_duplicates_chunks, build a plan for it
for no_duplicates_chunk in self.no_duplicates_chunks.to_vec() {
plans.push(Self::build_plan_for_non_duplicates_chunk(
Arc::clone(&table_name),
Arc::clone(&schema),
no_duplicates_chunk.to_owned(),
predicate.clone(),
));
}
}
let final_plan = plans.remove(0);
// TODO
// There are still plan, add UnionExec
if !plans.is_empty() {
// final_plan = union_plan
panic!("Unexpected error: There should be only one output for scan plan");
}
final_plan
}
/// discover overlaps and split them into three groups:
/// 1. vector of vector of overlapped chunks
/// 2. vector of non-overlapped chunks, each have duplicates in itself
/// 3. vectors of non-overlapped chunks without duplicates
fn split_overlapped_chunks(&mut self, chunks: Vec<Arc<C>>) {
// TODO: need to discover overlaps and split them
// The current behavior is just like neither overlaps nor having duplicates in its own chunk
//self.overlapped_chunks_set = vec![];
//self.in_chunk_duplicates_chunks = vec![];
self.no_duplicates_chunks.append(&mut chunks.to_vec());
}
/// Return true if all chunks are neither overlap nor has duplicates in itself
fn no_duplicates(&self) -> bool {
self.overlapped_chunks_set.is_empty() && self.in_chunk_duplicates_chunks.is_empty()
}
/// Return deduplicate plan for the given overlapped chunks
/// The plan will look like this
/// ```text
/// ┌─────────────────┐
/// │ DeduplicateExec │
/// └─────────────────┘
/// ▲
/// │
/// ┌───────────────────────┐
/// │SortPreservingMergeExec│
/// └───────────────────────┘
/// ▲
/// │
/// ┌───────────┴───────────┐
/// │ │
/// ┌─────────────────┐ ┌─────────────────┐
/// │ SortExec │ ... │ SortExec │
/// │ (optional) │ │ (optional) │
/// └─────────────────┘ └─────────────────┘
/// ▲ ▲
/// │ ... │
/// │ │
/// ┌─────────────────┐ ┌─────────────────┐
/// │IOxReadFilterNode│ │IOxReadFilterNode│
/// │ (Chunk 1) │ ... │ (Chunk n) │
/// └─────────────────┘ └─────────────────┘
///```
fn build_deduplicate_plan_for_overlapped_chunks(
table_name: Arc<str>,
schema: ArrowSchemaRef,
chunks: Vec<Arc<C>>, // These chunks are identified overlapped
predicate: Predicate,
) -> Arc<dyn ExecutionPlan> {
// TODO
// Currently return just like there are no overlaps, no duplicates
Arc::new(IOxReadFilterNode::new(
Arc::clone(&table_name),
schema,
chunks,
predicate,
))
}
/// Return deduplicate plan for a given chunk with duplicates
/// The plan will look like this
/// ```text
/// ┌─────────────────┐
/// │ DeduplicateExec │
/// └─────────────────┘
/// ▲
/// │
/// ┌─────────────────┐
/// │ SortExec │
/// │ (optional) │
/// └─────────────────┘
/// ▲
/// │
/// │
/// ┌─────────────────┐
/// │IOxReadFilterNode│
/// │ (Chunk) │
/// └─────────────────┘
///```
fn build_deduplicate_plan_for_chunk_with_duplicates(
table_name: Arc<str>,
schema: ArrowSchemaRef,
chunk: Arc<C>, // This chunk is identified having duplicates
predicate: Predicate,
) -> Arc<dyn ExecutionPlan> {
// // TODO
// // Currently return just like there are no overlaps, no duplicates
Arc::new(IOxReadFilterNode::new(
Arc::clone(&table_name),
schema,
vec![chunk],
predicate,
))
}
/// Return the simplest IOx scan plan of a given chunk which is IOxReadFilterNode
/// ```text
/// ┌─────────────────┐
/// │IOxReadFilterNode│
/// │ (Chunk) │
/// └─────────────────┘
///```
fn build_plan_for_non_duplicates_chunk(
table_name: Arc<str>,
schema: ArrowSchemaRef,
chunk: Arc<C>, // This chunk is identified having no duplicates
predicate: Predicate,
) -> Arc<dyn ExecutionPlan> {
Arc::new(IOxReadFilterNode::new(
Arc::clone(&table_name),
schema,
vec![chunk],
predicate,
))
}
/// Return the simplest IOx scan plan for many chunks which is IOxReadFilterNode
/// ```text
/// ┌─────────────────┐
/// │IOxReadFilterNode│
/// │ (Chunk) │
/// └─────────────────┘
///```
fn build_plans_for_non_duplicates_chunk(
table_name: Arc<str>,
schema: ArrowSchemaRef,
chunks: Vec<Arc<C>>, // This chunk is identified having no duplicates
predicate: Predicate,
) -> Arc<dyn ExecutionPlan> {
Arc::new(IOxReadFilterNode::new(
Arc::clone(&table_name),
schema,
chunks,
predicate,
))
}
}
#[derive(Debug)]
/// A pruner that does not do pruning (suitable if no additional pruning is possible)
struct NoOpPruner {}