feat: hook SortPreservingMergeExec into deduplication framework

pull/24376/head
Nga Tran 2021-06-09 23:29:44 -04:00
parent feed19ac8b
commit 4cf05df35b
4 changed files with 209 additions and 36 deletions

1
Cargo.lock generated
View File

@ -885,6 +885,7 @@ name = "data_types"
version = "0.1.0"
dependencies = [
"chrono",
"datafusion 0.1.0",
"influxdb_line_protocol",
"observability_deps",
"percent-encoding",

View File

@ -8,6 +8,7 @@ readme = "README.md"
[dependencies] # In alphabetical order
chrono = { version = "0.4", features = ["serde"] }
datafusion = { path = "../datafusion" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
percent-encoding = "2.1.0"
regex = "1.4"

View File

@ -1,6 +1,9 @@
//! This module contains structs that describe the metadata for a partition
//! including schema, summary statistics, and file locations in storage.
use datafusion::arrow::compute::SortOptions;
use datafusion::physical_plan::expressions::{col, PhysicalSortExpr};
use std::{borrow::Cow, cmp::Ordering, mem};
use serde::{Deserialize, Serialize};
@ -173,6 +176,25 @@ impl TableSummary {
key_summaries
}
/// Returns the pk in arrow's expression used for data sorting
pub fn arrow_pk_sort_exprs(key_summaries: Vec<&ColumnSummary>) -> Vec<PhysicalSortExpr> {
//let key_summaries = self.primary_key_columns();
// build sort expression
let mut sort_exprs = vec![];
for key in key_summaries {
sort_exprs.push(PhysicalSortExpr {
expr: col(key.name.as_str()),
options: SortOptions {
descending: false,
nulls_first: false,
},
});
}
sort_exprs
}
}
// Replicate this enum here as it can't be derived from the existing statistics
@ -202,7 +224,7 @@ pub struct ColumnSummary {
/// Column's Influx data model type (if any)
pub influxdb_type: Option<InfluxDbType>,
/// Per column statistics
/// Per column
pub stats: Statistics,
}

View File

@ -2,7 +2,8 @@
use std::sync::Arc;
use arrow::{compute::SortOptions, datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError};
use arrow::{datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError};
use data_types::partition_metadata::TableSummary;
use datafusion::{
datasource::{
datasource::{Statistics, TableProviderFilterPushDown},
@ -11,9 +12,8 @@ use datafusion::{
error::{DataFusionError, Result as DataFusionResult},
logical_plan::Expr,
physical_plan::{
expressions::{col, PhysicalSortExpr},
sort::SortExec,
ExecutionPlan,
expressions::PhysicalSortExpr, sort::SortExec,
sort_preserving_merge::SortPreservingMergeExec, union::UnionExec, ExecutionPlan,
},
};
use internal_types::schema::{merge::SchemaMerger, Schema};
@ -373,7 +373,7 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
Arc::clone(&schema),
overlapped_chunks.to_owned(),
predicate.clone(),
));
)?);
}
// Go over each in_chunk_duplicates_chunks, build deduplicate plan for each
@ -447,6 +447,11 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
/// ┌───────────────────────┐
/// │SortPreservingMergeExec│
/// └───────────────────────┘
/// ▲
/// │
/// ┌───────────────────────┐
/// │ UnionExec │
/// └───────────────────────┘
/// ▲
/// │
/// ┌───────────┴───────────┐
@ -468,15 +473,38 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
schema: ArrowSchemaRef,
chunks: Vec<Arc<C>>, // These chunks are identified overlapped
predicate: Predicate,
) -> Arc<dyn ExecutionPlan> {
// TODO
// Currently return just like there are no overlaps, no duplicates
Arc::new(IOxReadFilterNode::new(
Arc::clone(&table_name),
schema,
chunks,
predicate,
))
) -> Result<Arc<dyn ExecutionPlan>> {
// Build sort plan for each chunk
let mut sorted_chunk_plans: Vec<Arc<dyn ExecutionPlan>> = vec![];
for chunk in &chunks {
sorted_chunk_plans.push(Self::build_sort_plan_for_read_filter(
Arc::clone(&table_name),
Arc::clone(&schema),
Arc::clone(&chunk),
predicate.clone(),
)?);
}
// TODOs: build primary key by accumulating unique key columns from each chunk's table summary
// use the one of the first chunk for now
let key_summaries = chunks[0].summary().primary_key_columns();
// Union the plans
// The UnionExec operator only streams all chunks (aka partitions in Datafusion) and
// keep them in separate chunks which exactly what we need here
let plan = UnionExec::new(sorted_chunk_plans);
// Now (sort) merge the already sorted chunks
let sort_exprs = TableSummary::arrow_pk_sort_exprs(key_summaries);
let plan = Arc::new(SortPreservingMergeExec::new(
sort_exprs.clone(),
Arc::new(plan),
1024,
));
// Add DeduplicateExc
Self::add_deduplicate_node(sort_exprs, Ok(plan))
}
/// Return deduplicate plan for a given chunk with duplicates
@ -504,6 +532,54 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
schema: ArrowSchemaRef,
chunk: Arc<C>, // This chunk is identified having duplicates
predicate: Predicate,
) -> Result<Arc<dyn ExecutionPlan>> {
// Create the 2 bottom nodes IOxReadFilterNode and SortExec
let plan = Self::build_sort_plan_for_read_filter(
table_name,
schema,
Arc::clone(&chunk),
predicate,
);
// Add DeduplicateExc
// Sort exprs for the deduplication
let key_summaries = chunk.summary().primary_key_columns();
let sort_exprs = TableSummary::arrow_pk_sort_exprs(key_summaries);
Self::add_deduplicate_node(sort_exprs, plan)
}
// Hooks DeduplicateExec on top of the given input plan
fn add_deduplicate_node(
_sort_exprs: Vec<PhysicalSortExpr>,
input: Result<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
// TODOS when DeduplicateExec is build
// Ticket https://github.com/influxdata/influxdb_iox/issues/1646
// Currently simply return the input plan
input
}
/// Return a sort plan for for a given chunk
/// The plan will look like this
/// ```text
/// ┌─────────────────┐
/// │ SortExec │
/// │ (optional) │
/// └─────────────────┘
/// ▲
/// │
/// │
/// ┌─────────────────┐
/// │IOxReadFilterNode│
/// │ (Chunk) │
/// └─────────────────┘
///```
fn build_sort_plan_for_read_filter(
table_name: Arc<str>,
schema: ArrowSchemaRef,
chunk: Arc<C>, // This chunk is identified having duplicates
predicate: Predicate,
) -> Result<Arc<dyn ExecutionPlan>> {
// Create the bottom node IOxReadFilterNode for this chunk
let input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new(
@ -514,14 +590,7 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
));
// Add the sort operator, SortExec, if needed
//let plan = Self::build_sort_plan(chunk, input);
Self::build_sort_plan(chunk, input)
// Create DeduplicateExc
// TODO: Add DeuplicateExec here when it is implemented in https://github.com/influxdata/influxdb_iox/issues/1646
//plan = add_deduplicate_exec(plan);
//plan
}
/// Add SortExec operator on top of the input plan of the given chunk
@ -534,20 +603,8 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
return Ok(input);
}
// Sort the chunk on pk
let key_summaries = chunk.summary().primary_key_columns();
// build sort expression
let mut sort_exprs = vec![];
for key in key_summaries {
sort_exprs.push(PhysicalSortExpr {
expr: col(key.name.as_str()),
options: SortOptions {
descending: false,
nulls_first: false,
},
});
}
let sort_exprs = TableSummary::arrow_pk_sort_exprs(key_summaries);
// Create SortExec operator
Ok(Arc::new(
@ -580,7 +637,7 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
/// ```text
/// ┌─────────────────┐
/// │IOxReadFilterNode│
/// │ (Chunk)
/// │ (Many Chunks) │
/// └─────────────────┘
///```
fn build_plans_for_non_duplicates_chunk(
@ -761,6 +818,98 @@ mod test {
assert_batches_eq!(&expected, &batch);
}
#[tokio::test]
async fn sort_read_filter_plan_for_two_tags_with_time() {
// Chunk 1 with 5 rows of data
let chunk = Arc::new(
TestChunk::new(1)
.with_time_column("t")
.with_tag_column("t", "tag1")
.with_tag_column("t", "tag2")
.with_int_field_column("t", "field_int")
.with_five_rows_of_data("t"),
);
// Datafusion schema of the chunk
let schema = chunk.table_schema(Selection::All).unwrap().as_arrow();
let sort_plan = Deduplicater::build_sort_plan_for_read_filter(
Arc::from("t"),
schema,
Arc::clone(&chunk),
Predicate::default(),
);
let batch = collect(sort_plan.unwrap()).await.unwrap();
// data is not sorted on primary key(tag1, tag2, time)
let expected = vec![
"+-----------+------+------+-------------------------------+",
"| field_int | tag1 | tag2 | time |",
"+-----------+------+------+-------------------------------+",
"| 100 | AL | MA | 1970-01-01 00:00:00.000000050 |",
"| 70 | CT | CT | 1970-01-01 00:00:00.000000100 |",
"| 5 | MT | AL | 1970-01-01 00:00:00.000005 |",
"| 10 | MT | AL | 1970-01-01 00:00:00.000007 |",
"| 1000 | MT | CT | 1970-01-01 00:00:00.000001 |",
"+-----------+------+------+-------------------------------+",
];
assert_batches_eq!(&expected, &batch);
}
#[tokio::test]
async fn deduplicate_plan_for_overlapped_chunks() {
// Chunk 1 with 5 rows of data on 2 tags
let chunk1 = Arc::new(
TestChunk::new(1)
.with_time_column("t")
.with_tag_column("t", "tag1")
.with_tag_column("t", "tag2")
.with_int_field_column("t", "field_int")
.with_five_rows_of_data("t"),
);
// Chunk 2 exactly the same with Chunk 1
let chunk2 = Arc::new(
TestChunk::new(1)
.with_time_column("t")
.with_tag_column("t", "tag1")
.with_tag_column("t", "tag2")
.with_int_field_column("t", "field_int")
.with_five_rows_of_data("t"),
);
// Datafusion schema of the chunk
// the same for 2 chunks
let schema = chunk1.table_schema(Selection::All).unwrap().as_arrow();
let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks(
Arc::from("t"),
schema,
vec![chunk1, chunk2],
Predicate::default(),
);
let batch = collect(sort_plan.unwrap()).await.unwrap();
// data is not sorted on primary key(tag1, tag2, time)
// NOTE: When the full deduplication is done, the duplciates will be removed from this output
let expected = vec![
"+-----------+------+------+-------------------------------+",
"| field_int | tag1 | tag2 | time |",
"+-----------+------+------+-------------------------------+",
"| 100 | AL | MA | 1970-01-01 00:00:00.000000050 |",
"| 100 | AL | MA | 1970-01-01 00:00:00.000000050 |",
"| 70 | CT | CT | 1970-01-01 00:00:00.000000100 |",
"| 70 | CT | CT | 1970-01-01 00:00:00.000000100 |",
"| 5 | MT | AL | 1970-01-01 00:00:00.000005 |",
"| 5 | MT | AL | 1970-01-01 00:00:00.000005 |",
"| 10 | MT | AL | 1970-01-01 00:00:00.000007 |",
"| 10 | MT | AL | 1970-01-01 00:00:00.000007 |",
"| 1000 | MT | CT | 1970-01-01 00:00:00.000001 |",
"| 1000 | MT | CT | 1970-01-01 00:00:00.000001 |",
"+-----------+------+------+-------------------------------+",
];
assert_batches_eq!(&expected, &batch);
}
fn chunk_ids(group: &[Arc<TestChunk>]) -> String {
let ids = group.iter().map(|c| c.id().to_string()).collect::<Vec<_>>();
ids.join(", ")