feat: full foundation for deduplicate with todo functions to finish

pull/24376/head
Nga Tran 2021-06-06 22:09:01 -04:00
parent 42f26b609b
commit 2f82a9d670
4 changed files with 317 additions and 3 deletions

View File

@ -98,6 +98,15 @@ impl TableSummary {
}
}
/// Returns the primary key of this table
pub fn primary_key(&self) -> Vec<String> {
self.columns
.iter()
.filter(|c| c.key_part())
.map(|c| c.name.clone())
.collect()
}
/// Returns the total number of rows in the columns of this summary
pub fn count(&self) -> u64 {
// Assumes that all tables have the same number of rows, so
@ -186,6 +195,15 @@ impl ColumnSummary {
self.stats.type_name()
}
/// Return true if this column is a part of the primary key which
/// means it is either a tag or timestamp
pub fn key_part(&self) -> bool {
matches!(
self.influxdb_type,
Some(InfluxDbType::Tag) | Some(InfluxDbType::Timestamp)
)
}
/// Return size in bytes of this Column metadata (not the underlying column)
pub fn size(&self) -> usize {
mem::size_of::<Self>() + self.name.len() + self.stats.size()

View File

@ -58,6 +58,11 @@ pub enum Error {
InternalPushdownPredicate {
source: datafusion::error::DataFusionError,
},
#[snafu(display("Internal error while looking for overlapped chunks '{}'", source,))]
InternalSplitOvelappedChunks {
source: datafusion::error::DataFusionError,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -243,10 +248,15 @@ impl<C: PartitionChunk + 'static> TableProvider for ChunkTableProvider<C> {
// Figure out the schema of the requested output
let scan_schema = project_schema(self.arrow_schema(), projection);
let plan =
IOxReadFilterNode::new(Arc::clone(&self.table_name), scan_schema, chunks, predicate);
let mut deduplicate = Deduplicater::new();
let plan = deduplicate.build_scan_plan(
Arc::clone(&self.table_name),
scan_schema,
chunks,
predicate,
);
Ok(Arc::new(plan))
Ok(plan)
}
fn statistics(&self) -> Statistics {
@ -262,6 +272,282 @@ impl<C: PartitionChunk + 'static> TableProvider for ChunkTableProvider<C> {
}
}
#[derive(Clone, Debug, Default)]
/// A deduplicater that deduplicate the duplicated data during scan execution
pub(crate) struct Deduplicater<C: PartitionChunk + 'static> {
// a vector of a vector of overlapped chunks
pub overlapped_chunks_set: Vec<Vec<Arc<C>>>,
// a vector of non-overlapped chunks each have duplicates in itself
pub in_chunk_duplicates_chunks: Vec<Arc<C>>,
// a vector of non-overlapped and non-duplicates chunks
pub no_duplicates_chunks: Vec<Arc<C>>,
}
impl<C: PartitionChunk + 'static> Deduplicater<C> {
fn new() -> Self {
Self {
overlapped_chunks_set: vec![],
in_chunk_duplicates_chunks: vec![],
no_duplicates_chunks: vec![],
}
}
/// The IOx scan process needs to deduplicate data if there are duplicates. Hence it will look
/// like this. In this example, there are 4 chunks.
/// . Chunks 1 and 2 overlap and need to get deduplicated. This includes these main steps:
/// i. Read/scan/steam the chunk: IOxReadFilterNode.
/// ii. Sort each chunk if they are not sorted yet: SortExec.
/// iii. Merge the sorted chunks into one stream: SortPreservingMergeExc.
/// iv. Deduplicate the sorted stream: DeduplicateExec
/// . Chunk 3 does not overlap with others but has duplicates in it self, hence it only needs to get
/// sorted if needed, then deduplicated.
/// . Chunk 4 neither overlaps with other chunks nor has duplicates in itself, hence it does not
/// need any extra besides chunk reading.
/// The final UnionExec on top is to union the streams below. If there is only one stream, UnionExec
/// will not be added into the plan.
/// ```text
/// ┌─────────────────┐
/// │ UnionExec │
/// │ │
/// └─────────────────┘
/// ▲
/// │
/// ┌──────────────────────┴───────────┬─────────────────────┐
/// │ │ │
/// │ │ │
/// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
/// │ DeduplicateExec │ │ DeduplicateExec │ │IOxReadFilterNode│
/// └─────────────────┘ └─────────────────┘ │ (Chunk 4) │
/// ▲ ▲ └─────────────────┘
/// │ │
/// ┌───────────────────────┐ │
/// │SortPreservingMergeExec│ │
/// └───────────────────────┘ │
/// ▲ |
/// │ |
/// ┌───────────┴───────────┐ │
/// │ │ │
/// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
/// │ SortExec │ │ SortExec │ │ SortExec │
/// │ (optional) │ │ (optional) │ │ (optional) │
/// └─────────────────┘ └─────────────────┘ └─────────────────┘
/// ▲ ▲ ▲
/// │ │ │
/// │ │ │
/// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
/// │IOxReadFilterNode│ │IOxReadFilterNode│ │IOxReadFilterNode│
/// │ (Chunk 1) │ │ (Chunk 2) │ │ (Chunk 3) │
/// └─────────────────┘ └─────────────────┘ └─────────────────┘
///```
fn build_scan_plan(
&mut self,
table_name: Arc<str>,
schema: ArrowSchemaRef,
chunks: Vec<Arc<C>>,
predicate: Predicate,
) -> Arc<dyn ExecutionPlan> {
//predicate: Predicate,) -> std::result::Result<Arc<dyn ExecutionPlan>, DataFusionError> {
//finding overlapped chunks and put them into the right group
self.split_overlapped_chunks(chunks.to_vec());
// Building plans
let mut plans = vec![];
if self.no_duplicates() {
// Neither overlaps nor duplicates, no deduplicating needed
let plan = Self::build_plans_for_non_duplicates_chunk(
Arc::clone(&table_name),
Arc::clone(&schema),
chunks,
predicate,
);
plans.push(plan);
} else {
// Go over overlapped set, build deduplicate plan for each vector of overlapped chunks
for overlapped_chunks in self.overlapped_chunks_set.to_vec() {
plans.push(Self::build_deduplicate_plan_for_overlapped_chunks(
Arc::clone(&table_name),
Arc::clone(&schema),
overlapped_chunks.to_owned(),
predicate.clone(),
));
}
// Go over each in_chunk_duplicates_chunks, build deduplicate plan for each
for chunk_with_duplicates in self.in_chunk_duplicates_chunks.to_vec() {
plans.push(Self::build_deduplicate_plan_for_chunk_with_duplicates(
Arc::clone(&table_name),
Arc::clone(&schema),
chunk_with_duplicates.to_owned(),
predicate.clone(),
));
}
// Go over non_duplicates_chunks, build a plan for it
for no_duplicates_chunk in self.no_duplicates_chunks.to_vec() {
plans.push(Self::build_plan_for_non_duplicates_chunk(
Arc::clone(&table_name),
Arc::clone(&schema),
no_duplicates_chunk.to_owned(),
predicate.clone(),
));
}
}
let final_plan = plans.remove(0);
// TODO
// There are still plan, add UnionExec
if !plans.is_empty() {
// final_plan = union_plan
// ....
}
final_plan
}
/// discover overlaps and split them into three groups:
/// 1. vector of vector of overlapped chunks
/// 2. vector of non-overlapped chunks, each have duplicates in itself
/// 3. vectors of non-overlapped chunks without duplicates
fn split_overlapped_chunks(&mut self, chunks: Vec<Arc<C>>) {
// TODO: need to discover overlaps and split them
// The current behavior is just like neither overlaps nor having duplicates in its own chunk
//self.overlapped_chunks_set = vec![];
//self.in_chunk_duplicates_chunks = vec![];
self.no_duplicates_chunks.append(&mut chunks.to_vec());
}
/// Return true if all chunks are neither overlap nor has duplicates in itself
fn no_duplicates(&self) -> bool {
self.overlapped_chunks_set.is_empty() && self.in_chunk_duplicates_chunks.is_empty()
}
/// Return deduplicate plan for the given overlapped chunks
/// The plan will look like this
/// ```text
/// ┌─────────────────┐
/// │ DeduplicateExec │
/// └─────────────────┘
/// ▲
/// │
/// ┌───────────────────────┐
/// │SortPreservingMergeExec│
/// └───────────────────────┘
/// ▲
/// │
/// ┌───────────┴───────────┐
/// │ │
/// ┌─────────────────┐ ┌─────────────────┐
/// │ SortExec │ ... │ SortExec │
/// │ (optional) │ │ (optional) │
/// └─────────────────┘ └─────────────────┘
/// ▲ ▲
/// │ ... │
/// │ │
/// ┌─────────────────┐ ┌─────────────────┐
/// │IOxReadFilterNode│ │IOxReadFilterNode│
/// │ (Chunk 1) │ ... │ (Chunk n) │
/// └─────────────────┘ └─────────────────┘
///```
fn build_deduplicate_plan_for_overlapped_chunks(
table_name: Arc<str>,
schema: ArrowSchemaRef,
chunks: Vec<Arc<C>>, // These chunks are identified overlapped
predicate: Predicate,
) -> Arc<dyn ExecutionPlan> {
// TODO
// Currently return just like there are no overlaps, no duplicates
Arc::new(IOxReadFilterNode::new(
Arc::clone(&table_name),
schema,
chunks,
predicate,
))
}
/// Return deduplicate plan for a given chunk with duplicates
/// The plan will look like this
/// ```text
/// ┌─────────────────┐
/// │ DeduplicateExec │
/// └─────────────────┘
/// ▲
/// │
/// ┌─────────────────┐
/// │ SortExec │
/// │ (optional) │
/// └─────────────────┘
/// ▲
/// │
/// │
/// ┌─────────────────┐
/// │IOxReadFilterNode│
/// │ (Chunk) │
/// └─────────────────┘
///```
fn build_deduplicate_plan_for_chunk_with_duplicates(
table_name: Arc<str>,
schema: ArrowSchemaRef,
chunk: Arc<C>, // This chunk is identified having duplicates
predicate: Predicate,
) -> Arc<dyn ExecutionPlan> {
// // TODO
// // Currently return just like there are no overlaps, no duplicates
Arc::new(IOxReadFilterNode::new(
Arc::clone(&table_name),
schema,
vec![chunk],
predicate,
))
}
/// Return the simplest IOx scan plan of a given chunk which is IOxReadFilterNode
/// ```text
/// ┌─────────────────┐
/// │IOxReadFilterNode│
/// │ (Chunk) │
/// └─────────────────┘
///```
fn build_plan_for_non_duplicates_chunk(
table_name: Arc<str>,
schema: ArrowSchemaRef,
chunk: Arc<C>, // This chunk is identified having no duplicates
predicate: Predicate,
) -> Arc<dyn ExecutionPlan> {
Arc::new(IOxReadFilterNode::new(
Arc::clone(&table_name),
schema,
vec![chunk],
predicate,
))
}
/// Return the simplest IOx scan plan for many chunks which is IOxReadFilterNode
/// ```text
/// ┌─────────────────┐
/// │IOxReadFilterNode│
/// │ (Chunk) │
/// └─────────────────┘
///```
fn build_plans_for_non_duplicates_chunk(
table_name: Arc<str>,
schema: ArrowSchemaRef,
chunks: Vec<Arc<C>>, // This chunk is identified having no duplicates
predicate: Predicate,
) -> Arc<dyn ExecutionPlan> {
Arc::new(IOxReadFilterNode::new(
Arc::clone(&table_name),
schema,
chunks,
predicate,
))
}
}
#[derive(Debug)]
/// A pruner that does not do pruning (suitable if no additional pruning is possible)
struct NoOpPruner {}

View File

@ -49,6 +49,12 @@ pub struct ChunkMetadata {
pub schema: Arc<Schema>,
}
impl ChunkMetadata {
pub fn primary_key(&self) -> Vec<String> {
self.table_summary.primary_key()
}
}
/// Different memory representations of a frozen chunk.
#[derive(Debug)]
pub enum ChunkStageFrozenRepr {

View File

@ -165,6 +165,10 @@ impl DbChunk {
})
}
pub fn primary_key(&self) -> Vec<String> {
self.meta.primary_key()
}
/// Return the snapshot of the chunk with type ParquetFile
/// This function should be only invoked when you know your chunk
/// is ParquetFile type whose state is WrittenToObjectStore. The