feat: Implement DeduplicateExec (#1733)

* feat: Implement DeduplicateExec

* fix: Doc comments

* fix: fix comment

* fix: Update with arrow ticket references and use datafusion coalsce batches impl

* refactor: rename inner.rs to algo.rs

* docs: Add additional documentation on rationale for last field value

* docs: Apply suggestions from code review

Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>

* docs: Update query/src/provider/deduplicate/algo.rs

Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>

* docs: Apply suggestions from code review

Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>

* refactor: do not use pub(crate)

* docs: fix test comments

Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2021-06-17 10:17:52 -04:00 committed by GitHub
parent f6dbc8d6f2
commit c5eea9af6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1164 additions and 42 deletions

1
Cargo.lock generated
View File

@ -3005,6 +3005,7 @@ dependencies = [
"datafusion 0.1.0",
"datafusion_util",
"futures",
"hashbrown 0.11.2",
"internal_types",
"libc",
"metrics",

View File

@ -23,6 +23,7 @@ data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
datafusion_util = { path = "../datafusion_util" }
futures = "0.3"
hashbrown = "0.11"
internal_types = { path = "../internal_types" }
metrics = { path = "../metrics" }
parking_lot = "0.11.1"

View File

@ -28,8 +28,9 @@ use crate::{
use snafu::{ResultExt, Snafu};
mod adapter;
mod deduplicate;
mod physical;
use self::physical::IOxReadFilterNode;
use self::{deduplicate::DeduplicateExec, physical::IOxReadFilterNode};
#[derive(Debug, Snafu)]
pub enum Error {
@ -517,7 +518,8 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
));
// Add DeduplicateExc
Self::add_deduplicate_node(sort_exprs, Ok(plan))
let plan = Self::add_deduplicate_node(sort_exprs, plan);
Ok(plan)
}
/// Return deduplicate plan for a given chunk with duplicates
@ -552,25 +554,22 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
schema,
Arc::clone(&chunk),
predicate,
);
)?;
// Add DeduplicateExc
// Sort exprs for the deduplication
let key_summaries = chunk.summary().primary_key_columns();
let sort_exprs = arrow_pk_sort_exprs(key_summaries);
Self::add_deduplicate_node(sort_exprs, plan)
let plan = Self::add_deduplicate_node(sort_exprs, plan);
Ok(plan)
}
// Hooks DeduplicateExec on top of the given input plan
fn add_deduplicate_node(
_sort_exprs: Vec<PhysicalSortExpr>,
input: Result<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
// TODOS when DeduplicateExec is build
// Ticket https://github.com/influxdata/influxdb_iox/issues/1646
// Currently simply return the input plan
input
sort_exprs: Vec<PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
Arc::new(DeduplicateExec::new(input, sort_exprs))
}
/// Return a sort plan for for a given chunk
@ -902,20 +901,14 @@ mod test {
);
let batch = collect(sort_plan.unwrap()).await.unwrap();
// data is sorted on primary key(tag1, tag2, time)
// NOTE: When the full deduplication is done, the duplicates will be removed from this output
let expected = vec![
"+-----------+------+------+-------------------------------+",
"| field_int | tag1 | tag2 | time |",
"+-----------+------+------+-------------------------------+",
"| 100 | AL | MA | 1970-01-01 00:00:00.000000050 |",
"| 100 | AL | MA | 1970-01-01 00:00:00.000000050 |",
"| 70 | CT | CT | 1970-01-01 00:00:00.000000100 |",
"| 70 | CT | CT | 1970-01-01 00:00:00.000000100 |",
"| 5 | MT | AL | 1970-01-01 00:00:00.000005 |",
"| 5 | MT | AL | 1970-01-01 00:00:00.000005 |",
"| 10 | MT | AL | 1970-01-01 00:00:00.000007 |",
"| 10 | MT | AL | 1970-01-01 00:00:00.000007 |",
"| 1000 | MT | CT | 1970-01-01 00:00:00.000001 |",
"| 1000 | MT | CT | 1970-01-01 00:00:00.000001 |",
"+-----------+------+------+-------------------------------+",
];
@ -985,21 +978,16 @@ mod test {
);
let batch = collect(plan.unwrap()).await.unwrap();
// Data must be sorted and duplicates removed
// TODO: it is just sorted for now. When https://github.com/influxdata/influxdb_iox/issues/1646
// is done, duplicates will be removed
let expected = vec![
"+-----------+------+-------------------------------+",
"| field_int | tag1 | time |",
"+-----------+------+-------------------------------+",
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
"| 10 | AL | 1970-01-01 00:00:00.000000050 |",
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
"| 70 | CT | 1970-01-01 00:00:00.000000500 |",
"| 5 | MT | 1970-01-01 00:00:00.000000005 |",
"| 30 | MT | 1970-01-01 00:00:00.000000005 |",
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
"| 1000 | MT | 1970-01-01 00:00:00.000002 |",
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
"| 20 | MT | 1970-01-01 00:00:00.000007 |",
"+-----------+------+-------------------------------+",
];
@ -1038,27 +1026,18 @@ mod test {
);
let batch = collect(plan.unwrap()).await.unwrap();
// Two overlapped chunks will be sort merged with dupplicates removed
// TODO: it is just sorted for now. When https://github.com/influxdata/influxdb_iox/issues/1646
// is done, duplicates will be removed
let expected = vec![
"+-----------+------+-------------------------------+",
"| field_int | tag1 | time |",
"+-----------+------+-------------------------------+",
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
"| 10 | AL | 1970-01-01 00:00:00.000000050 |",
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
"| 70 | CT | 1970-01-01 00:00:00.000000500 |",
"| 5 | MT | 1970-01-01 00:00:00.000000005 |",
"| 30 | MT | 1970-01-01 00:00:00.000000005 |",
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
"| 1000 | MT | 1970-01-01 00:00:00.000002 |",
"| 5 | MT | 1970-01-01 00:00:00.000005 |",
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
"| 20 | MT | 1970-01-01 00:00:00.000007 |",
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
"+-----------+------+-------------------------------+",
];
assert_batches_eq!(&expected, &batch);
@ -1124,8 +1103,6 @@ mod test {
// . chunk1 and chunk2 will be sorted merged and deduplicated (rows 8-32)
// . chunk3 will stay in its original (rows 1-3)
// . chunk4 will be sorted and deduplicated (rows 4-7)
// TODO: data is only partially sorted for now. The deduplication will happen when When https://github.com/influxdata/influxdb_iox/issues/1646
// is done
let expected = vec![
"+-----------+------+-------------------------------+",
"| field_int | tag1 | time |",
@ -1134,24 +1111,16 @@ mod test {
"| 10 | VT | 1970-01-01 00:00:00.000010 |",
"| 70 | UT | 1970-01-01 00:00:00.000020 |",
"| 70 | UT | 1970-01-01 00:00:00.000020 |",
"| 10 | VT | 1970-01-01 00:00:00.000010 |",
"| 50 | VT | 1970-01-01 00:00:00.000010 |",
"| 1000 | WA | 1970-01-01 00:00:00.000008 |",
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
"| 10 | AL | 1970-01-01 00:00:00.000000050 |",
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
"| 70 | CT | 1970-01-01 00:00:00.000000500 |",
"| 5 | MT | 1970-01-01 00:00:00.000000005 |",
"| 30 | MT | 1970-01-01 00:00:00.000000005 |",
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
"| 1000 | MT | 1970-01-01 00:00:00.000002 |",
"| 5 | MT | 1970-01-01 00:00:00.000005 |",
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
"| 20 | MT | 1970-01-01 00:00:00.000007 |",
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
"+-----------+------+-------------------------------+",
];
assert_batches_eq!(&expected, &batch);

View File

@ -0,0 +1,904 @@
//! Implemention of DeduplicateExec operator (resolves primary key conflicts) plumbing and tests
mod algo;
use std::{fmt, sync::Arc};
use arrow::{
datatypes::SchemaRef,
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};
use async_trait::async_trait;
use self::algo::RecordBatchDeduplicator;
use datafusion::{
error::{DataFusionError, Result},
physical_plan::{
expressions::PhysicalSortExpr, DisplayFormatType, Distribution, ExecutionPlan,
Partitioning, RecordBatchStream, SQLMetric, SendableRecordBatchStream,
},
};
use futures::{Stream, StreamExt};
use hashbrown::HashMap;
use observability_deps::tracing::debug;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
/// # DeduplicateExec
///
/// This operator takes an input stream of RecordBatches that is
/// already sorted on "sort_key" and applies IOx specific deduplication
/// logic.
///
/// The output is dependent on the order of the the input rows which
/// have the same key.
///
/// Specifically, the value chosen for each non-sort_key column is the
/// "last" non-null value. This is used to model "upserts" when new
/// rows with the same primary key are inserted a second time to update
/// existing values.
///
/// # Example
/// For example, given a sort key of (t1, t2) and the following input
/// (already sorted on t1 and t2):
///
/// ```text
/// +----+----+----+----+
/// | t1 | t2 | f1 | f2 |
/// +----+----+----+----+
/// | a | x | 2 | |
/// | a | x | 2 | 1 |
/// | a | x | | 3 |
/// | a | y | 3 | 1 |
/// | b | y | 3 | |
/// | c | y | 1 | 1 |
/// +----+----+----+----+
/// ```
///
/// This operator will produce the following output (note the values
/// chosen for (a, x)):
///
/// ```text
/// +----+----+----+----+
/// | t1 | t2 | f1 | f2 |
/// +----+----+----+----+
/// | a | x | 2 | 3 |
/// | a | y | 3 | 1 |
/// | b | y | 3 | |
/// | c | y | 1 | 1 |
/// +----+----+----+----+
/// ```
///
/// # Field Resolution (why the last non-null value?)
///
/// The choice of the latest non-null value instead of the latest value is
/// subtle and thus we try to document the rationale here. It is a
/// consequence of the LineProtocol update model.
///
/// Some observations about line protocol are:
///
/// 1. Lines are treated as "UPSERT"s (aka updating any existing
/// values, possibly adding new fields)
///
/// 2. Fields can not be removed or set to NULL via a line (So if a
/// field has a NULL value it means the user didn't provide a value
/// for that field)
///
/// For example, this data (with a NULL for `f2`):
///
/// ```text
/// t1 | f1 | f2
/// ---+----+----
/// a | 1 | 3
// a | 2 |
/// ```
///
/// Would have come from line protocol like
/// ```text
/// m,t1=a f1=1,f2=3
/// m,t1=a f1=3
/// ```
/// (note there was no value for f2 provided in the second line, it can
/// be read as "upsert value of f1=3, the value of f2 is not modified).
///
/// Thus it would not be correct to take the latest value from f2
/// (NULL) as in the source input the field's value was not provided.
#[derive(Debug)]
pub struct DeduplicateExec {
input: Arc<dyn ExecutionPlan>,
sort_keys: Vec<PhysicalSortExpr>,
num_dupes: Arc<SQLMetric>,
}
impl DeduplicateExec {
pub fn new(input: Arc<dyn ExecutionPlan>, sort_keys: Vec<PhysicalSortExpr>) -> Self {
let num_dupes = SQLMetric::counter();
Self {
input,
sort_keys,
num_dupes,
}
}
}
#[async_trait]
impl ExecutionPlan for DeduplicateExec {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> arrow::datatypes::SchemaRef {
self.input.schema()
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![Arc::clone(&self.input)]
}
fn with_new_children(
&self,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
assert_eq!(children.len(), 1);
let input = Arc::clone(&children[0]);
Ok(Arc::new(Self::new(input, self.sort_keys.clone())))
}
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
if partition != 0 {
return Err(DataFusionError::Internal(
"DeduplicateExec only supports a single input stream".to_string(),
));
}
let input_stream = self.input.execute(0).await?;
// the deduplication is performed in a separate task which is
// then sent via a channel to the output
let (tx, rx) = mpsc::channel(1);
let task = tokio::task::spawn(deduplicate(
input_stream,
self.sort_keys.clone(),
tx.clone(),
Arc::clone(&self.num_dupes),
));
// A second task watches the output of the worker task
tokio::task::spawn(async move {
let task_result = task.await;
let msg = match task_result {
Err(join_err) => {
debug!(e=%join_err, "Error joining deduplicate task");
Some(ArrowError::ExternalError(Box::new(join_err)))
}
Ok(Err(e)) => {
debug!(%e, "Error in deduplicate task itself");
Some(e)
}
Ok(Ok(())) => {
// successful
None
}
};
if let Some(e) = msg {
// try and tell the receiver something went
// wrong. Note we ignore errors sending this message
// as that means the receiver has already been
// shutdown and no one cares anymore lol
if tx.send(Err(e)).await.is_err() {
debug!("deduplicate receiver hung up");
}
}
});
let stream = AdapterStream {
schema: self.schema(),
inner: ReceiverStream::new(rx),
};
Ok(Box::pin(stream))
}
fn required_child_distribution(&self) -> Distribution {
Distribution::SinglePartition
}
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match t {
DisplayFormatType::Default => {
let expr: Vec<String> = self.sort_keys.iter().map(|e| e.to_string()).collect();
write!(f, "DeduplicateExec: [{}]", expr.join(","))
}
}
}
fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert("numDuplicates".to_owned(), self.num_dupes.as_ref().clone());
metrics
}
}
async fn deduplicate(
mut input_stream: SendableRecordBatchStream,
sort_keys: Vec<PhysicalSortExpr>,
tx: mpsc::Sender<ArrowResult<RecordBatch>>,
num_dupes: Arc<SQLMetric>,
) -> ArrowResult<()> {
let mut deduplicator = RecordBatchDeduplicator::new(sort_keys, num_dupes);
// Stream input through the indexer
while let Some(batch) = input_stream.next().await {
let batch = batch?;
let output_batch = deduplicator.push(batch)?;
tx.send(Ok(output_batch))
.await
.map_err(|e| ArrowError::from_external_error(Box::new(e)))?;
}
// send any left over batch
if let Some(output_batch) = deduplicator.finish()? {
tx.send(Ok(output_batch))
.await
.map_err(|e| ArrowError::from_external_error(Box::new(e)))?;
}
Ok(())
}
#[derive(Debug)]
struct AdapterStream {
/// Schema
schema: SchemaRef,
/// channel for getting deduplicated batches
inner: ReceiverStream<ArrowResult<RecordBatch>>,
}
impl Stream for AdapterStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
impl RecordBatchStream for AdapterStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
#[cfg(test)]
mod test {
use arrow::compute::SortOptions;
use arrow::{
array::{ArrayRef, Float64Array, StringArray},
record_batch::RecordBatch,
};
use arrow_util::assert_batches_eq;
use datafusion::physical_plan::{collect, expressions::col, memory::MemoryExec};
use super::*;
#[tokio::test]
async fn test_single_tag() {
// input:
// t1 | f1 | f2
// ---+----+----
// a | 1 |
// a | 2 | 3
// a | | 4
// b | 5 | 6
// c | 7 |
// c | |
// c | | 8
//
// expected output:
//
// t1 | f1 | f2
// ---+----+----
// a | 2 | 4
// b | 5 | 6
// c | 7 | 8
let t1 = StringArray::from(vec![
Some("a"),
Some("a"),
Some("a"),
Some("b"),
Some("c"),
Some("c"),
Some("c"),
]);
let f1 = Float64Array::from(vec![
Some(1.0),
Some(2.0),
None,
Some(5.0),
Some(7.0),
None,
None,
]);
let f2 = Float64Array::from(vec![
None,
Some(3.0),
Some(4.0),
Some(6.0),
None,
None,
Some(8.0),
]);
let batch = RecordBatch::try_from_iter(vec![
("t1", Arc::new(t1) as ArrayRef),
("f1", Arc::new(f1) as ArrayRef),
("f2", Arc::new(f2) as ArrayRef),
])
.unwrap();
let sort_keys = vec![PhysicalSortExpr {
expr: col("t1"),
options: SortOptions {
descending: false,
nulls_first: false,
},
}];
let results = dedupe(vec![batch], sort_keys).await;
let expected = vec![
"+----+----+----+",
"| t1 | f1 | f2 |",
"+----+----+----+",
"| a | 2 | 4 |",
"| b | 5 | 6 |",
"| c | 7 | 8 |",
"+----+----+----+",
];
assert_batches_eq!(&expected, &results.output);
}
#[tokio::test]
async fn test_multi_tag() {
// input:
// t1 | t2 | f1 | f2
// ---+----+----+----
// a | b | 1 |
// a | b | 2 | 3
// a | b | | 4
// a | z | 5 |
// b | b | 6 |
// b | c | 7 | 6
// c | c | 8 |
// d | b | | 9
// e | | 10 | 11
// e | | 12 |
// | f | 13 |
// | f | | 14
//
// expected output:
// t1 | t2 | f1 | f2
// ---+----+----+----
// a | b | 2 | 4
// a | z | 5 |
// b | b | 6 |
// b | c | 7 | 6
// c | c | 8 |
// d | b | | 9
// e | | 12 | 11
// | f | 13 | 14
let t1 = StringArray::from(vec![
Some("a"),
Some("a"),
Some("a"),
Some("a"),
Some("b"),
Some("b"),
Some("c"),
Some("d"),
Some("e"),
Some("e"),
None,
None,
]);
let t2 = StringArray::from(vec![
Some("b"),
Some("b"),
Some("b"),
Some("z"),
Some("b"),
Some("c"),
Some("c"),
Some("b"),
None,
None,
Some("f"),
Some("f"),
]);
let f1 = Float64Array::from(vec![
Some(1.0),
Some(2.0),
None,
Some(5.0),
Some(6.0),
Some(7.0),
Some(8.0),
None,
Some(10.0),
Some(12.0),
Some(13.0),
None,
]);
let f2 = Float64Array::from(vec![
None,
Some(3.0),
Some(4.0),
None,
None,
Some(6.0),
None,
Some(9.0),
Some(11.0),
None,
None,
Some(14.0),
]);
let batch = RecordBatch::try_from_iter(vec![
("t1", Arc::new(t1) as ArrayRef),
("t2", Arc::new(t2) as ArrayRef),
("f1", Arc::new(f1) as ArrayRef),
("f2", Arc::new(f2) as ArrayRef),
])
.unwrap();
let sort_keys = vec![
PhysicalSortExpr {
expr: col("t1"),
options: SortOptions {
descending: false,
nulls_first: false,
},
},
PhysicalSortExpr {
expr: col("t2"),
options: SortOptions {
descending: false,
nulls_first: false,
},
},
];
let results = dedupe(vec![batch], sort_keys).await;
let expected = vec![
"+----+----+----+----+",
"| t1 | t2 | f1 | f2 |",
"+----+----+----+----+",
"| a | b | 2 | 4 |",
"| a | z | 5 | |",
"| b | b | 6 | |",
"| b | c | 7 | 6 |",
"| c | c | 8 | |",
"| d | b | | 9 |",
"| e | | 12 | 11 |",
"| | f | 13 | 14 |",
"+----+----+----+----+",
];
assert_batches_eq!(&expected, &results.output);
}
#[tokio::test]
async fn test_multi_record_batch() {
// input:
// t1 | t2 | f1 | f2
// ---+----+----+----
// a | b | 1 | 2
// a | c | 3 |
// a | c | 4 | 5
// ====(next batch)====
// a | c | | 6
// b | d | 7 | 8
//
// expected output:
// t1 | t2 | f1 | f2
// ---+----+----+----
// a | b | 1 | 2
// a | c | 4 | 6
// b | d | 7 | 8
let t1 = StringArray::from(vec![Some("a"), Some("a"), Some("a")]);
let t2 = StringArray::from(vec![Some("b"), Some("c"), Some("c")]);
let f1 = Float64Array::from(vec![Some(1.0), Some(3.0), Some(4.0)]);
let f2 = Float64Array::from(vec![Some(2.0), None, Some(5.0)]);
let batch1 = RecordBatch::try_from_iter(vec![
("t1", Arc::new(t1) as ArrayRef),
("t2", Arc::new(t2) as ArrayRef),
("f1", Arc::new(f1) as ArrayRef),
("f2", Arc::new(f2) as ArrayRef),
])
.unwrap();
let t1 = StringArray::from(vec![Some("a"), Some("b")]);
let t2 = StringArray::from(vec![Some("b"), Some("d")]);
let f1 = Float64Array::from(vec![None, Some(7.0)]);
let f2 = Float64Array::from(vec![Some(6.0), Some(8.0)]);
let batch2 = RecordBatch::try_from_iter(vec![
("t1", Arc::new(t1) as ArrayRef),
("t2", Arc::new(t2) as ArrayRef),
("f1", Arc::new(f1) as ArrayRef),
("f2", Arc::new(f2) as ArrayRef),
])
.unwrap();
let sort_keys = vec![
PhysicalSortExpr {
expr: col("t1"),
options: SortOptions {
descending: false,
nulls_first: false,
},
},
PhysicalSortExpr {
expr: col("t2"),
options: SortOptions {
descending: false,
nulls_first: false,
},
},
];
let results = dedupe(vec![batch1, batch2], sort_keys).await;
let expected = vec![
"+----+----+----+----+",
"| t1 | t2 | f1 | f2 |",
"+----+----+----+----+",
"| a | b | 1 | 2 |",
"| a | c | 4 | 6 |",
"| b | d | 7 | 8 |",
"+----+----+----+----+",
];
assert_batches_eq!(&expected, &results.output);
// 5 rows in initial input, 3 rows in output ==> 2 dupes
assert_eq!(results.num_dupes(), 5 - 3);
}
#[tokio::test]
async fn test_no_dupes() {
// special case test for data without duplicates (fast path)
// input:
// t1 | f1
// ---+----
// a | 1
// ====(next batch)====
// b | 2
//
// expected output:
//
// t1 | f1
// ---+----
// a | 1
// b | 2
let t1 = StringArray::from(vec![Some("a")]);
let f1 = Float64Array::from(vec![Some(1.0)]);
let batch1 = RecordBatch::try_from_iter(vec![
("t1", Arc::new(t1) as ArrayRef),
("f1", Arc::new(f1) as ArrayRef),
])
.unwrap();
let t1 = StringArray::from(vec![Some("b")]);
let f1 = Float64Array::from(vec![Some(2.0)]);
let batch2 = RecordBatch::try_from_iter(vec![
("t1", Arc::new(t1) as ArrayRef),
("f1", Arc::new(f1) as ArrayRef),
])
.unwrap();
let sort_keys = vec![PhysicalSortExpr {
expr: col("t1"),
options: SortOptions {
descending: false,
nulls_first: false,
},
}];
let results = dedupe(vec![batch1, batch2], sort_keys).await;
let expected = vec![
"+----+----+",
"| t1 | f1 |",
"+----+----+",
"| a | 1 |",
"| b | 2 |",
"+----+----+",
];
assert_batches_eq!(&expected, &results.output);
// also validate there were no dupes detected
assert_eq!(results.num_dupes(), 0);
}
#[tokio::test]
async fn test_single_pk() {
// test boundary condition
// input:
// t1 | f1 | f2
// ---+----+----
// a | 1 | 2
// a | 3 | 4
//
// expected output:
//
// t1 | f1 | f2
// ---+----+----
// a | 3 | 4
let t1 = StringArray::from(vec![Some("a"), Some("a")]);
let f1 = Float64Array::from(vec![Some(1.0), Some(3.0)]);
let f2 = Float64Array::from(vec![Some(2.0), Some(4.0)]);
let batch = RecordBatch::try_from_iter(vec![
("t1", Arc::new(t1) as ArrayRef),
("f1", Arc::new(f1) as ArrayRef),
("f2", Arc::new(f2) as ArrayRef),
])
.unwrap();
let sort_keys = vec![PhysicalSortExpr {
expr: col("t1"),
options: SortOptions {
descending: false,
nulls_first: false,
},
}];
let results = dedupe(vec![batch], sort_keys).await;
let expected = vec![
"+----+----+----+",
"| t1 | f1 | f2 |",
"+----+----+----+",
"| a | 3 | 4 |",
"+----+----+----+",
];
assert_batches_eq!(&expected, &results.output);
}
#[tokio::test]
async fn test_column_reorder() {
// test if they fields come before tags and tags not in right order
// input:
// f1 | t2 | t1
// ---+----+----
// 1 | a | a
// 2 | a | a
// 3 | a | b
// 4 | b | b
//
// expected output:
//
// f1 | t2 | t1
// ---+----+----
// 2 | a | a
// 3 | a | b
// 4 | b | b
let f1 = Float64Array::from(vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0)]);
let t2 = StringArray::from(vec![Some("a"), Some("a"), Some("a"), Some("b")]);
let t1 = StringArray::from(vec![Some("a"), Some("a"), Some("b"), Some("b")]);
let batch = RecordBatch::try_from_iter(vec![
("f1", Arc::new(f1) as ArrayRef),
("t2", Arc::new(t2) as ArrayRef),
("t1", Arc::new(t1) as ArrayRef),
])
.unwrap();
let sort_keys = vec![
PhysicalSortExpr {
expr: col("t1"),
options: SortOptions {
descending: false,
nulls_first: false,
},
},
PhysicalSortExpr {
expr: col("t2"),
options: SortOptions {
descending: false,
nulls_first: false,
},
},
];
let results = dedupe(vec![batch], sort_keys).await;
let expected = vec![
"+----+----+----+",
"| f1 | t2 | t1 |",
"+----+----+----+",
"| 2 | a | a |",
"| 3 | a | b |",
"| 4 | b | b |",
"+----+----+----+",
];
assert_batches_eq!(&expected, &results.output);
}
#[tokio::test]
async fn test_input_error_propagated() {
// test that an error from the input gets to the output
// input:
// t1 | f1
// ---+----
// a | 1
// === next batch ===
// (error)
let t1 = StringArray::from(vec![Some("a")]);
let f1 = Float64Array::from(vec![Some(1.0)]);
let batch = RecordBatch::try_from_iter(vec![
("t1", Arc::new(t1) as ArrayRef),
("f1", Arc::new(f1) as ArrayRef),
])
.unwrap();
let schema = batch.schema();
let batches = vec![
Ok(batch),
Err(ArrowError::ComputeError("This is the error".to_string())),
];
let input = Arc::new(DummyExec {
schema: Arc::clone(&schema),
batches,
});
let sort_keys = vec![PhysicalSortExpr {
expr: col("t1"),
options: SortOptions {
descending: false,
nulls_first: false,
},
}];
let exec = Arc::new(DeduplicateExec::new(input, sort_keys));
let output = collect(Arc::clone(&exec) as Arc<dyn ExecutionPlan>)
.await
.unwrap_err()
.to_string();
assert!(
output.contains("Compute error: This is the error"),
"actual output: {}",
output
);
}
struct TestResults {
output: Vec<RecordBatch>,
exec: Arc<DeduplicateExec>,
}
impl TestResults {
/// return the number of duplicates this deduplicator detected
fn num_dupes(&self) -> usize {
self.exec
.metrics()
.get("numDuplicates")
.expect("No dupe metrics found")
.value()
}
}
/// Run the input through the deduplicator and return results
async fn dedupe(input: Vec<RecordBatch>, sort_keys: Vec<PhysicalSortExpr>) -> TestResults {
test_helpers::maybe_start_logging();
// Setup in memory stream
let schema = input[0].schema();
let projection = None;
let input = Arc::new(MemoryExec::try_new(&[input], schema, projection).unwrap());
// Create and run the deduplicator
let exec = Arc::new(DeduplicateExec::new(input, sort_keys));
let output = collect(Arc::clone(&exec) as Arc<dyn ExecutionPlan>)
.await
.unwrap();
TestResults { output, exec }
}
/// A PhysicalPlan that sends a specific set of
/// Result<RecordBatch> for testing.
#[derive(Debug)]
struct DummyExec {
schema: SchemaRef,
batches: Vec<ArrowResult<RecordBatch>>,
}
#[async_trait]
impl ExecutionPlan for DummyExec {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn output_partitioning(&self) -> Partitioning {
unimplemented!();
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
&self,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
assert_eq!(partition, 0);
// ensure there is space to queue up the channel
let (tx, rx) = mpsc::channel(self.batches.len());
// queue up all the results
for r in &self.batches {
match r {
Ok(batch) => tx.send(Ok(batch.clone())).await.unwrap(),
Err(e) => tx.send(Err(clone_error(e))).await.unwrap(),
}
}
let stream = AdapterStream {
schema: self.schema(),
inner: ReceiverStream::new(rx),
};
Ok(Box::pin(stream))
}
}
fn clone_error(e: &ArrowError) -> ArrowError {
use ArrowError::*;
match e {
ComputeError(msg) => ComputeError(msg.to_string()),
_ => unimplemented!(),
}
}
}

View File

@ -0,0 +1,247 @@
//! Implementation of Deduplication algorithm
use std::{ops::Range, sync::Arc};
use arrow::{
array::{ArrayRef, UInt64Array},
compute::TakeOptions,
error::Result as ArrowResult,
record_batch::RecordBatch,
};
use datafusion::physical_plan::{
coalesce_batches::concat_batches, expressions::PhysicalSortExpr, PhysicalExpr, SQLMetric,
};
use observability_deps::tracing::debug;
// Handles the deduplication across potentially multiple
// [`RecordBatch`]es which are already sorted on a primary key,
// including primary keys which straddle RecordBatch boundaries
#[derive(Debug)]
pub(crate) struct RecordBatchDeduplicator {
sort_keys: Vec<PhysicalSortExpr>,
last_batch: Option<RecordBatch>,
num_dupes: Arc<SQLMetric>,
}
#[derive(Debug)]
struct DuplicateRanges {
/// `is_sort_key[col_idx] = true` if the the input column at
/// `col_idx` is present in sort keys
is_sort_key: Vec<bool>,
/// ranges of row indices where the sort key columns have the
/// same values
ranges: Vec<Range<usize>>,
}
impl RecordBatchDeduplicator {
pub fn new(sort_keys: Vec<PhysicalSortExpr>, num_dupes: Arc<SQLMetric>) -> Self {
Self {
sort_keys,
last_batch: None,
num_dupes,
}
}
/// Push a new RecordBatch into the indexer. Returns a
/// deduplicated RecordBatch and remembers any currently opened
/// groups
pub fn push(&mut self, batch: RecordBatch) -> ArrowResult<RecordBatch> {
// If we had a previous batch of rows, add it in here
//
// Potential optimization would be to check if the sort key is actually the same
// for the first row in the new batch and skip this concat if that is the case
let batch = if let Some(last_batch) = self.last_batch.take() {
let schema = last_batch.schema();
let row_count = last_batch.num_rows() + batch.num_rows();
concat_batches(&schema, &[last_batch, batch], row_count)?
} else {
batch
};
let mut dupe_ranges = self.compute_ranges(&batch)?;
// The last partition may span batches so we can't emit it
// until we have seen the next batch (or we are at end of
// stream)
let last_range = dupe_ranges.ranges.pop();
let output_record_batch = self.output_from_ranges(&batch, &dupe_ranges)?;
// Now, save the last bit of the pk
if let Some(last_range) = last_range {
let len = last_range.end - last_range.start;
let last_batch = Self::slice_record_batch(&batch, last_range.start, len)?;
self.last_batch = Some(last_batch);
}
Ok(output_record_batch)
}
/// Consume the indexer, returning any remaining record batches for output
pub fn finish(mut self) -> ArrowResult<Option<RecordBatch>> {
self.last_batch
.take()
.map(|last_batch| {
let dupe_ranges = self.compute_ranges(&last_batch)?;
self.output_from_ranges(&last_batch, &dupe_ranges)
})
.transpose()
}
/// Computes the ranges where the sort key has the same values
fn compute_ranges(&self, batch: &RecordBatch) -> ArrowResult<DuplicateRanges> {
let schema = batch.schema();
// is_sort_key[col_idx] = true if it is present in sort keys
let mut is_sort_key: Vec<bool> = vec![false; batch.columns().len()];
// Figure out where the partitions are:
let columns: Vec<_> = self
.sort_keys
.iter()
.map(|skey| {
// figure out what input column this is for
let name = get_col_name(skey.expr.as_ref());
let index = schema.index_of(name).unwrap();
is_sort_key[index] = true;
let array = batch.column(index);
arrow::compute::SortColumn {
values: Arc::clone(array),
options: Some(skey.options),
}
})
.collect();
// Compute partitions (aka breakpoints between the ranges)
let ranges = arrow::compute::lexicographical_partition_ranges(&columns)?;
Ok(DuplicateRanges {
is_sort_key,
ranges,
})
}
/// Compute the output record batch that includes the specified ranges
fn output_from_ranges(
&self,
batch: &RecordBatch,
dupe_ranges: &DuplicateRanges,
) -> ArrowResult<RecordBatch> {
let ranges = &dupe_ranges.ranges;
// each range is at least 1 large, so any that have more than
// 1 are duplicates
let num_dupes = ranges.iter().map(|r| r.end - r.start - 1).sum();
self.num_dupes.add(num_dupes);
// Special case when no ranges are duplicated (so just emit input as output)
if num_dupes == 0 {
debug!(num_rows = batch.num_rows(), "No dupes");
Self::slice_record_batch(&batch, 0, ranges.len())
} else {
debug!(num_dupes, num_rows = batch.num_rows(), "dupes");
// Use take kernel
let sort_key_indices = self.compute_sort_key_indices(&ranges);
let take_options = Some(TakeOptions {
check_bounds: false,
});
// Form each new column by `take`ing the indices as needed
let new_columns = batch
.columns()
.iter()
.enumerate()
.map(|(input_index, input_array)| {
if dupe_ranges.is_sort_key[input_index] {
arrow::compute::take(
input_array.as_ref(),
&sort_key_indices,
take_options.clone(),
)
} else {
// pick the last non null value
let field_indices = self.compute_field_indices(&ranges, input_array);
arrow::compute::take(
input_array.as_ref(),
&field_indices,
take_options.clone(),
)
}
})
.collect::<ArrowResult<Vec<ArrayRef>>>()?;
RecordBatch::try_new(batch.schema(), new_columns)
}
}
/// Returns an array of indices, one for each input range (which
/// index is arbitrary as all the values are the same for the sort
/// column in each pk group)
///
/// ranges: 0-1, 2-4, 5-6 --> Array[0, 2, 5]
fn compute_sort_key_indices(&self, ranges: &[Range<usize>]) -> UInt64Array {
ranges.iter().map(|r| Some(r.start as u64)).collect()
}
/// Returns an array of indices, one for each input range that
/// return the first non-null value of `input_array` in that range
/// (aka it will pick the index of the field value to use for each
/// pk group)
///
/// ranges: 0-1, 2-4, 5-6
/// input array: A, NULL, NULL, C, NULL, NULL
/// --> Array[0, 3, 5]
fn compute_field_indices(
&self,
ranges: &[Range<usize>],
input_array: &ArrayRef,
) -> UInt64Array {
ranges
.iter()
.map(|r| {
let value_index = r
.clone()
.filter(|&i| input_array.is_valid(i))
.last()
.map(|i| i as u64)
// if all field values are none, pick one arbitrarily
.unwrap_or(r.start as u64);
Some(value_index)
})
.collect()
}
/// Create a new record batch from offset --> len
///
/// https://github.com/apache/arrow-rs/issues/460 for adding this upstream
fn slice_record_batch(
batch: &RecordBatch,
offset: usize,
len: usize,
) -> ArrowResult<RecordBatch> {
let schema = batch.schema();
let new_columns: Vec<_> = batch
.columns()
.iter()
.map(|old_column| old_column.slice(offset, len))
.collect();
RecordBatch::try_new(schema, new_columns)
}
}
/// Get column name out of the `expr`. TODO use
/// internal_types::schema::SortKey instead.
fn get_col_name(expr: &dyn PhysicalExpr) -> &str {
expr.as_any()
.downcast_ref::<datafusion::physical_plan::expressions::Column>()
.expect("expected column reference")
.name()
}