diff --git a/Cargo.lock b/Cargo.lock index 4fcaae34db..b24d9a93cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3005,6 +3005,7 @@ dependencies = [ "datafusion 0.1.0", "datafusion_util", "futures", + "hashbrown 0.11.2", "internal_types", "libc", "metrics", diff --git a/query/Cargo.toml b/query/Cargo.toml index 70455ec11a..1bcf265115 100644 --- a/query/Cargo.toml +++ b/query/Cargo.toml @@ -23,6 +23,7 @@ data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } datafusion_util = { path = "../datafusion_util" } futures = "0.3" +hashbrown = "0.11" internal_types = { path = "../internal_types" } metrics = { path = "../metrics" } parking_lot = "0.11.1" diff --git a/query/src/provider.rs b/query/src/provider.rs index cfcf5ca1c6..4bfbc39c47 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -28,8 +28,9 @@ use crate::{ use snafu::{ResultExt, Snafu}; mod adapter; +mod deduplicate; mod physical; -use self::physical::IOxReadFilterNode; +use self::{deduplicate::DeduplicateExec, physical::IOxReadFilterNode}; #[derive(Debug, Snafu)] pub enum Error { @@ -517,7 +518,8 @@ impl Deduplicater { )); // Add DeduplicateExc - Self::add_deduplicate_node(sort_exprs, Ok(plan)) + let plan = Self::add_deduplicate_node(sort_exprs, plan); + Ok(plan) } /// Return deduplicate plan for a given chunk with duplicates @@ -552,25 +554,22 @@ impl Deduplicater { schema, Arc::clone(&chunk), predicate, - ); + )?; // Add DeduplicateExc // Sort exprs for the deduplication let key_summaries = chunk.summary().primary_key_columns(); let sort_exprs = arrow_pk_sort_exprs(key_summaries); - Self::add_deduplicate_node(sort_exprs, plan) + let plan = Self::add_deduplicate_node(sort_exprs, plan); + Ok(plan) } // Hooks DeduplicateExec on top of the given input plan fn add_deduplicate_node( - _sort_exprs: Vec, - input: Result>, - ) -> Result> { - // TODOS when DeduplicateExec is build - // Ticket https://github.com/influxdata/influxdb_iox/issues/1646 - - // Currently simply return the input plan - input + sort_exprs: Vec, + input: Arc, + ) -> Arc { + Arc::new(DeduplicateExec::new(input, sort_exprs)) } /// Return a sort plan for for a given chunk @@ -902,20 +901,14 @@ mod test { ); let batch = collect(sort_plan.unwrap()).await.unwrap(); // data is sorted on primary key(tag1, tag2, time) - // NOTE: When the full deduplication is done, the duplicates will be removed from this output let expected = vec![ "+-----------+------+------+-------------------------------+", "| field_int | tag1 | tag2 | time |", "+-----------+------+------+-------------------------------+", "| 100 | AL | MA | 1970-01-01 00:00:00.000000050 |", - "| 100 | AL | MA | 1970-01-01 00:00:00.000000050 |", - "| 70 | CT | CT | 1970-01-01 00:00:00.000000100 |", "| 70 | CT | CT | 1970-01-01 00:00:00.000000100 |", "| 5 | MT | AL | 1970-01-01 00:00:00.000005 |", - "| 5 | MT | AL | 1970-01-01 00:00:00.000005 |", "| 10 | MT | AL | 1970-01-01 00:00:00.000007 |", - "| 10 | MT | AL | 1970-01-01 00:00:00.000007 |", - "| 1000 | MT | CT | 1970-01-01 00:00:00.000001 |", "| 1000 | MT | CT | 1970-01-01 00:00:00.000001 |", "+-----------+------+------+-------------------------------+", ]; @@ -985,21 +978,16 @@ mod test { ); let batch = collect(plan.unwrap()).await.unwrap(); // Data must be sorted and duplicates removed - // TODO: it is just sorted for now. When https://github.com/influxdata/influxdb_iox/issues/1646 - // is done, duplicates will be removed let expected = vec![ "+-----------+------+-------------------------------+", "| field_int | tag1 | time |", "+-----------+------+-------------------------------+", - "| 100 | AL | 1970-01-01 00:00:00.000000050 |", "| 10 | AL | 1970-01-01 00:00:00.000000050 |", "| 70 | CT | 1970-01-01 00:00:00.000000100 |", "| 70 | CT | 1970-01-01 00:00:00.000000500 |", - "| 5 | MT | 1970-01-01 00:00:00.000000005 |", "| 30 | MT | 1970-01-01 00:00:00.000000005 |", "| 1000 | MT | 1970-01-01 00:00:00.000001 |", "| 1000 | MT | 1970-01-01 00:00:00.000002 |", - "| 10 | MT | 1970-01-01 00:00:00.000007 |", "| 20 | MT | 1970-01-01 00:00:00.000007 |", "+-----------+------+-------------------------------+", ]; @@ -1038,27 +1026,18 @@ mod test { ); let batch = collect(plan.unwrap()).await.unwrap(); // Two overlapped chunks will be sort merged with dupplicates removed - // TODO: it is just sorted for now. When https://github.com/influxdata/influxdb_iox/issues/1646 - // is done, duplicates will be removed let expected = vec![ "+-----------+------+-------------------------------+", "| field_int | tag1 | time |", "+-----------+------+-------------------------------+", "| 100 | AL | 1970-01-01 00:00:00.000000050 |", - "| 10 | AL | 1970-01-01 00:00:00.000000050 |", - "| 100 | AL | 1970-01-01 00:00:00.000000050 |", - "| 70 | CT | 1970-01-01 00:00:00.000000100 |", "| 70 | CT | 1970-01-01 00:00:00.000000100 |", "| 70 | CT | 1970-01-01 00:00:00.000000500 |", - "| 5 | MT | 1970-01-01 00:00:00.000000005 |", "| 30 | MT | 1970-01-01 00:00:00.000000005 |", "| 1000 | MT | 1970-01-01 00:00:00.000001 |", - "| 1000 | MT | 1970-01-01 00:00:00.000001 |", "| 1000 | MT | 1970-01-01 00:00:00.000002 |", "| 5 | MT | 1970-01-01 00:00:00.000005 |", "| 10 | MT | 1970-01-01 00:00:00.000007 |", - "| 20 | MT | 1970-01-01 00:00:00.000007 |", - "| 10 | MT | 1970-01-01 00:00:00.000007 |", "+-----------+------+-------------------------------+", ]; assert_batches_eq!(&expected, &batch); @@ -1124,8 +1103,6 @@ mod test { // . chunk1 and chunk2 will be sorted merged and deduplicated (rows 8-32) // . chunk3 will stay in its original (rows 1-3) // . chunk4 will be sorted and deduplicated (rows 4-7) - // TODO: data is only partially sorted for now. The deduplication will happen when When https://github.com/influxdata/influxdb_iox/issues/1646 - // is done let expected = vec![ "+-----------+------+-------------------------------+", "| field_int | tag1 | time |", @@ -1134,24 +1111,16 @@ mod test { "| 10 | VT | 1970-01-01 00:00:00.000010 |", "| 70 | UT | 1970-01-01 00:00:00.000020 |", "| 70 | UT | 1970-01-01 00:00:00.000020 |", - "| 10 | VT | 1970-01-01 00:00:00.000010 |", "| 50 | VT | 1970-01-01 00:00:00.000010 |", "| 1000 | WA | 1970-01-01 00:00:00.000008 |", "| 100 | AL | 1970-01-01 00:00:00.000000050 |", - "| 10 | AL | 1970-01-01 00:00:00.000000050 |", - "| 100 | AL | 1970-01-01 00:00:00.000000050 |", - "| 70 | CT | 1970-01-01 00:00:00.000000100 |", "| 70 | CT | 1970-01-01 00:00:00.000000100 |", "| 70 | CT | 1970-01-01 00:00:00.000000500 |", - "| 5 | MT | 1970-01-01 00:00:00.000000005 |", "| 30 | MT | 1970-01-01 00:00:00.000000005 |", "| 1000 | MT | 1970-01-01 00:00:00.000001 |", - "| 1000 | MT | 1970-01-01 00:00:00.000001 |", "| 1000 | MT | 1970-01-01 00:00:00.000002 |", "| 5 | MT | 1970-01-01 00:00:00.000005 |", "| 10 | MT | 1970-01-01 00:00:00.000007 |", - "| 20 | MT | 1970-01-01 00:00:00.000007 |", - "| 10 | MT | 1970-01-01 00:00:00.000007 |", "+-----------+------+-------------------------------+", ]; assert_batches_eq!(&expected, &batch); diff --git a/query/src/provider/deduplicate.rs b/query/src/provider/deduplicate.rs new file mode 100644 index 0000000000..2633fb11c0 --- /dev/null +++ b/query/src/provider/deduplicate.rs @@ -0,0 +1,904 @@ +//! Implemention of DeduplicateExec operator (resolves primary key conflicts) plumbing and tests +mod algo; + +use std::{fmt, sync::Arc}; + +use arrow::{ + datatypes::SchemaRef, + error::{ArrowError, Result as ArrowResult}, + record_batch::RecordBatch, +}; +use async_trait::async_trait; + +use self::algo::RecordBatchDeduplicator; +use datafusion::{ + error::{DataFusionError, Result}, + physical_plan::{ + expressions::PhysicalSortExpr, DisplayFormatType, Distribution, ExecutionPlan, + Partitioning, RecordBatchStream, SQLMetric, SendableRecordBatchStream, + }, +}; +use futures::{Stream, StreamExt}; +use hashbrown::HashMap; +use observability_deps::tracing::debug; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; + +/// # DeduplicateExec +/// +/// This operator takes an input stream of RecordBatches that is +/// already sorted on "sort_key" and applies IOx specific deduplication +/// logic. +/// +/// The output is dependent on the order of the the input rows which +/// have the same key. +/// +/// Specifically, the value chosen for each non-sort_key column is the +/// "last" non-null value. This is used to model "upserts" when new +/// rows with the same primary key are inserted a second time to update +/// existing values. +/// +/// # Example +/// For example, given a sort key of (t1, t2) and the following input +/// (already sorted on t1 and t2): +/// +/// ```text +/// +----+----+----+----+ +/// | t1 | t2 | f1 | f2 | +/// +----+----+----+----+ +/// | a | x | 2 | | +/// | a | x | 2 | 1 | +/// | a | x | | 3 | +/// | a | y | 3 | 1 | +/// | b | y | 3 | | +/// | c | y | 1 | 1 | +/// +----+----+----+----+ +/// ``` +/// +/// This operator will produce the following output (note the values +/// chosen for (a, x)): +/// +/// ```text +/// +----+----+----+----+ +/// | t1 | t2 | f1 | f2 | +/// +----+----+----+----+ +/// | a | x | 2 | 3 | +/// | a | y | 3 | 1 | +/// | b | y | 3 | | +/// | c | y | 1 | 1 | +/// +----+----+----+----+ +/// ``` +/// +/// # Field Resolution (why the last non-null value?) +/// +/// The choice of the latest non-null value instead of the latest value is +/// subtle and thus we try to document the rationale here. It is a +/// consequence of the LineProtocol update model. +/// +/// Some observations about line protocol are: +/// +/// 1. Lines are treated as "UPSERT"s (aka updating any existing +/// values, possibly adding new fields) +/// +/// 2. Fields can not be removed or set to NULL via a line (So if a +/// field has a NULL value it means the user didn't provide a value +/// for that field) +/// +/// For example, this data (with a NULL for `f2`): +/// +/// ```text +/// t1 | f1 | f2 +/// ---+----+---- +/// a | 1 | 3 +// a | 2 | +/// ``` +/// +/// Would have come from line protocol like +/// ```text +/// m,t1=a f1=1,f2=3 +/// m,t1=a f1=3 +/// ``` +/// (note there was no value for f2 provided in the second line, it can +/// be read as "upsert value of f1=3, the value of f2 is not modified). +/// +/// Thus it would not be correct to take the latest value from f2 +/// (NULL) as in the source input the field's value was not provided. +#[derive(Debug)] +pub struct DeduplicateExec { + input: Arc, + sort_keys: Vec, + num_dupes: Arc, +} + +impl DeduplicateExec { + pub fn new(input: Arc, sort_keys: Vec) -> Self { + let num_dupes = SQLMetric::counter(); + Self { + input, + sort_keys, + num_dupes, + } + } +} + +#[async_trait] +impl ExecutionPlan for DeduplicateExec { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow::datatypes::SchemaRef { + self.input.schema() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn children(&self) -> Vec> { + vec![Arc::clone(&self.input)] + } + + fn with_new_children( + &self, + children: Vec>, + ) -> datafusion::error::Result> { + assert_eq!(children.len(), 1); + let input = Arc::clone(&children[0]); + Ok(Arc::new(Self::new(input, self.sort_keys.clone()))) + } + + async fn execute(&self, partition: usize) -> Result { + if partition != 0 { + return Err(DataFusionError::Internal( + "DeduplicateExec only supports a single input stream".to_string(), + )); + } + + let input_stream = self.input.execute(0).await?; + + // the deduplication is performed in a separate task which is + // then sent via a channel to the output + let (tx, rx) = mpsc::channel(1); + + let task = tokio::task::spawn(deduplicate( + input_stream, + self.sort_keys.clone(), + tx.clone(), + Arc::clone(&self.num_dupes), + )); + + // A second task watches the output of the worker task + tokio::task::spawn(async move { + let task_result = task.await; + + let msg = match task_result { + Err(join_err) => { + debug!(e=%join_err, "Error joining deduplicate task"); + Some(ArrowError::ExternalError(Box::new(join_err))) + } + Ok(Err(e)) => { + debug!(%e, "Error in deduplicate task itself"); + Some(e) + } + Ok(Ok(())) => { + // successful + None + } + }; + + if let Some(e) = msg { + // try and tell the receiver something went + // wrong. Note we ignore errors sending this message + // as that means the receiver has already been + // shutdown and no one cares anymore lol + if tx.send(Err(e)).await.is_err() { + debug!("deduplicate receiver hung up"); + } + } + }); + + let stream = AdapterStream { + schema: self.schema(), + inner: ReceiverStream::new(rx), + }; + Ok(Box::pin(stream)) + } + + fn required_child_distribution(&self) -> Distribution { + Distribution::SinglePartition + } + + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match t { + DisplayFormatType::Default => { + let expr: Vec = self.sort_keys.iter().map(|e| e.to_string()).collect(); + write!(f, "DeduplicateExec: [{}]", expr.join(",")) + } + } + } + + fn metrics(&self) -> HashMap { + let mut metrics = HashMap::new(); + metrics.insert("numDuplicates".to_owned(), self.num_dupes.as_ref().clone()); + metrics + } +} + +async fn deduplicate( + mut input_stream: SendableRecordBatchStream, + sort_keys: Vec, + tx: mpsc::Sender>, + num_dupes: Arc, +) -> ArrowResult<()> { + let mut deduplicator = RecordBatchDeduplicator::new(sort_keys, num_dupes); + + // Stream input through the indexer + while let Some(batch) = input_stream.next().await { + let batch = batch?; + let output_batch = deduplicator.push(batch)?; + tx.send(Ok(output_batch)) + .await + .map_err(|e| ArrowError::from_external_error(Box::new(e)))?; + } + + // send any left over batch + if let Some(output_batch) = deduplicator.finish()? { + tx.send(Ok(output_batch)) + .await + .map_err(|e| ArrowError::from_external_error(Box::new(e)))?; + } + + Ok(()) +} + +#[derive(Debug)] +struct AdapterStream { + /// Schema + schema: SchemaRef, + /// channel for getting deduplicated batches + inner: ReceiverStream>, +} + +impl Stream for AdapterStream { + type Item = ArrowResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_next_unpin(cx) + } +} + +impl RecordBatchStream for AdapterStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +#[cfg(test)] +mod test { + use arrow::compute::SortOptions; + use arrow::{ + array::{ArrayRef, Float64Array, StringArray}, + record_batch::RecordBatch, + }; + use arrow_util::assert_batches_eq; + use datafusion::physical_plan::{collect, expressions::col, memory::MemoryExec}; + + use super::*; + + #[tokio::test] + async fn test_single_tag() { + // input: + // t1 | f1 | f2 + // ---+----+---- + // a | 1 | + // a | 2 | 3 + // a | | 4 + // b | 5 | 6 + // c | 7 | + // c | | + // c | | 8 + // + // expected output: + // + // t1 | f1 | f2 + // ---+----+---- + // a | 2 | 4 + // b | 5 | 6 + // c | 7 | 8 + + let t1 = StringArray::from(vec![ + Some("a"), + Some("a"), + Some("a"), + Some("b"), + Some("c"), + Some("c"), + Some("c"), + ]); + let f1 = Float64Array::from(vec![ + Some(1.0), + Some(2.0), + None, + Some(5.0), + Some(7.0), + None, + None, + ]); + let f2 = Float64Array::from(vec![ + None, + Some(3.0), + Some(4.0), + Some(6.0), + None, + None, + Some(8.0), + ]); + + let batch = RecordBatch::try_from_iter(vec![ + ("t1", Arc::new(t1) as ArrayRef), + ("f1", Arc::new(f1) as ArrayRef), + ("f2", Arc::new(f2) as ArrayRef), + ]) + .unwrap(); + + let sort_keys = vec![PhysicalSortExpr { + expr: col("t1"), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]; + + let results = dedupe(vec![batch], sort_keys).await; + + let expected = vec![ + "+----+----+----+", + "| t1 | f1 | f2 |", + "+----+----+----+", + "| a | 2 | 4 |", + "| b | 5 | 6 |", + "| c | 7 | 8 |", + "+----+----+----+", + ]; + assert_batches_eq!(&expected, &results.output); + } + + #[tokio::test] + async fn test_multi_tag() { + // input: + // t1 | t2 | f1 | f2 + // ---+----+----+---- + // a | b | 1 | + // a | b | 2 | 3 + // a | b | | 4 + // a | z | 5 | + // b | b | 6 | + // b | c | 7 | 6 + // c | c | 8 | + // d | b | | 9 + // e | | 10 | 11 + // e | | 12 | + // | f | 13 | + // | f | | 14 + // + // expected output: + // t1 | t2 | f1 | f2 + // ---+----+----+---- + // a | b | 2 | 4 + // a | z | 5 | + // b | b | 6 | + // b | c | 7 | 6 + // c | c | 8 | + // d | b | | 9 + // e | | 12 | 11 + // | f | 13 | 14 + + let t1 = StringArray::from(vec![ + Some("a"), + Some("a"), + Some("a"), + Some("a"), + Some("b"), + Some("b"), + Some("c"), + Some("d"), + Some("e"), + Some("e"), + None, + None, + ]); + + let t2 = StringArray::from(vec![ + Some("b"), + Some("b"), + Some("b"), + Some("z"), + Some("b"), + Some("c"), + Some("c"), + Some("b"), + None, + None, + Some("f"), + Some("f"), + ]); + + let f1 = Float64Array::from(vec![ + Some(1.0), + Some(2.0), + None, + Some(5.0), + Some(6.0), + Some(7.0), + Some(8.0), + None, + Some(10.0), + Some(12.0), + Some(13.0), + None, + ]); + + let f2 = Float64Array::from(vec![ + None, + Some(3.0), + Some(4.0), + None, + None, + Some(6.0), + None, + Some(9.0), + Some(11.0), + None, + None, + Some(14.0), + ]); + + let batch = RecordBatch::try_from_iter(vec![ + ("t1", Arc::new(t1) as ArrayRef), + ("t2", Arc::new(t2) as ArrayRef), + ("f1", Arc::new(f1) as ArrayRef), + ("f2", Arc::new(f2) as ArrayRef), + ]) + .unwrap(); + + let sort_keys = vec![ + PhysicalSortExpr { + expr: col("t1"), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }, + PhysicalSortExpr { + expr: col("t2"), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }, + ]; + + let results = dedupe(vec![batch], sort_keys).await; + + let expected = vec![ + "+----+----+----+----+", + "| t1 | t2 | f1 | f2 |", + "+----+----+----+----+", + "| a | b | 2 | 4 |", + "| a | z | 5 | |", + "| b | b | 6 | |", + "| b | c | 7 | 6 |", + "| c | c | 8 | |", + "| d | b | | 9 |", + "| e | | 12 | 11 |", + "| | f | 13 | 14 |", + "+----+----+----+----+", + ]; + assert_batches_eq!(&expected, &results.output); + } + + #[tokio::test] + async fn test_multi_record_batch() { + // input: + // t1 | t2 | f1 | f2 + // ---+----+----+---- + // a | b | 1 | 2 + // a | c | 3 | + // a | c | 4 | 5 + // ====(next batch)==== + // a | c | | 6 + // b | d | 7 | 8 + + // + // expected output: + // t1 | t2 | f1 | f2 + // ---+----+----+---- + // a | b | 1 | 2 + // a | c | 4 | 6 + // b | d | 7 | 8 + + let t1 = StringArray::from(vec![Some("a"), Some("a"), Some("a")]); + + let t2 = StringArray::from(vec![Some("b"), Some("c"), Some("c")]); + + let f1 = Float64Array::from(vec![Some(1.0), Some(3.0), Some(4.0)]); + + let f2 = Float64Array::from(vec![Some(2.0), None, Some(5.0)]); + + let batch1 = RecordBatch::try_from_iter(vec![ + ("t1", Arc::new(t1) as ArrayRef), + ("t2", Arc::new(t2) as ArrayRef), + ("f1", Arc::new(f1) as ArrayRef), + ("f2", Arc::new(f2) as ArrayRef), + ]) + .unwrap(); + + let t1 = StringArray::from(vec![Some("a"), Some("b")]); + + let t2 = StringArray::from(vec![Some("b"), Some("d")]); + + let f1 = Float64Array::from(vec![None, Some(7.0)]); + + let f2 = Float64Array::from(vec![Some(6.0), Some(8.0)]); + + let batch2 = RecordBatch::try_from_iter(vec![ + ("t1", Arc::new(t1) as ArrayRef), + ("t2", Arc::new(t2) as ArrayRef), + ("f1", Arc::new(f1) as ArrayRef), + ("f2", Arc::new(f2) as ArrayRef), + ]) + .unwrap(); + + let sort_keys = vec![ + PhysicalSortExpr { + expr: col("t1"), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }, + PhysicalSortExpr { + expr: col("t2"), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }, + ]; + + let results = dedupe(vec![batch1, batch2], sort_keys).await; + + let expected = vec![ + "+----+----+----+----+", + "| t1 | t2 | f1 | f2 |", + "+----+----+----+----+", + "| a | b | 1 | 2 |", + "| a | c | 4 | 6 |", + "| b | d | 7 | 8 |", + "+----+----+----+----+", + ]; + assert_batches_eq!(&expected, &results.output); + // 5 rows in initial input, 3 rows in output ==> 2 dupes + assert_eq!(results.num_dupes(), 5 - 3); + } + + #[tokio::test] + async fn test_no_dupes() { + // special case test for data without duplicates (fast path) + // input: + // t1 | f1 + // ---+---- + // a | 1 + // ====(next batch)==== + // b | 2 + // + // expected output: + // + // t1 | f1 + // ---+---- + // a | 1 + // b | 2 + + let t1 = StringArray::from(vec![Some("a")]); + let f1 = Float64Array::from(vec![Some(1.0)]); + + let batch1 = RecordBatch::try_from_iter(vec![ + ("t1", Arc::new(t1) as ArrayRef), + ("f1", Arc::new(f1) as ArrayRef), + ]) + .unwrap(); + + let t1 = StringArray::from(vec![Some("b")]); + let f1 = Float64Array::from(vec![Some(2.0)]); + + let batch2 = RecordBatch::try_from_iter(vec![ + ("t1", Arc::new(t1) as ArrayRef), + ("f1", Arc::new(f1) as ArrayRef), + ]) + .unwrap(); + + let sort_keys = vec![PhysicalSortExpr { + expr: col("t1"), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]; + + let results = dedupe(vec![batch1, batch2], sort_keys).await; + + let expected = vec![ + "+----+----+", + "| t1 | f1 |", + "+----+----+", + "| a | 1 |", + "| b | 2 |", + "+----+----+", + ]; + assert_batches_eq!(&expected, &results.output); + + // also validate there were no dupes detected + assert_eq!(results.num_dupes(), 0); + } + + #[tokio::test] + async fn test_single_pk() { + // test boundary condition + + // input: + // t1 | f1 | f2 + // ---+----+---- + // a | 1 | 2 + // a | 3 | 4 + // + // expected output: + // + // t1 | f1 | f2 + // ---+----+---- + // a | 3 | 4 + + let t1 = StringArray::from(vec![Some("a"), Some("a")]); + let f1 = Float64Array::from(vec![Some(1.0), Some(3.0)]); + let f2 = Float64Array::from(vec![Some(2.0), Some(4.0)]); + + let batch = RecordBatch::try_from_iter(vec![ + ("t1", Arc::new(t1) as ArrayRef), + ("f1", Arc::new(f1) as ArrayRef), + ("f2", Arc::new(f2) as ArrayRef), + ]) + .unwrap(); + + let sort_keys = vec![PhysicalSortExpr { + expr: col("t1"), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]; + + let results = dedupe(vec![batch], sort_keys).await; + + let expected = vec![ + "+----+----+----+", + "| t1 | f1 | f2 |", + "+----+----+----+", + "| a | 3 | 4 |", + "+----+----+----+", + ]; + assert_batches_eq!(&expected, &results.output); + } + + #[tokio::test] + async fn test_column_reorder() { + // test if they fields come before tags and tags not in right order + + // input: + // f1 | t2 | t1 + // ---+----+---- + // 1 | a | a + // 2 | a | a + // 3 | a | b + // 4 | b | b + // + // expected output: + // + // f1 | t2 | t1 + // ---+----+---- + // 2 | a | a + // 3 | a | b + // 4 | b | b + + let f1 = Float64Array::from(vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0)]); + let t2 = StringArray::from(vec![Some("a"), Some("a"), Some("a"), Some("b")]); + let t1 = StringArray::from(vec![Some("a"), Some("a"), Some("b"), Some("b")]); + + let batch = RecordBatch::try_from_iter(vec![ + ("f1", Arc::new(f1) as ArrayRef), + ("t2", Arc::new(t2) as ArrayRef), + ("t1", Arc::new(t1) as ArrayRef), + ]) + .unwrap(); + + let sort_keys = vec![ + PhysicalSortExpr { + expr: col("t1"), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }, + PhysicalSortExpr { + expr: col("t2"), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }, + ]; + + let results = dedupe(vec![batch], sort_keys).await; + + let expected = vec![ + "+----+----+----+", + "| f1 | t2 | t1 |", + "+----+----+----+", + "| 2 | a | a |", + "| 3 | a | b |", + "| 4 | b | b |", + "+----+----+----+", + ]; + assert_batches_eq!(&expected, &results.output); + } + + #[tokio::test] + async fn test_input_error_propagated() { + // test that an error from the input gets to the output + + // input: + // t1 | f1 + // ---+---- + // a | 1 + // === next batch === + // (error) + + let t1 = StringArray::from(vec![Some("a")]); + let f1 = Float64Array::from(vec![Some(1.0)]); + + let batch = RecordBatch::try_from_iter(vec![ + ("t1", Arc::new(t1) as ArrayRef), + ("f1", Arc::new(f1) as ArrayRef), + ]) + .unwrap(); + + let schema = batch.schema(); + let batches = vec![ + Ok(batch), + Err(ArrowError::ComputeError("This is the error".to_string())), + ]; + + let input = Arc::new(DummyExec { + schema: Arc::clone(&schema), + batches, + }); + + let sort_keys = vec![PhysicalSortExpr { + expr: col("t1"), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]; + + let exec = Arc::new(DeduplicateExec::new(input, sort_keys)); + let output = collect(Arc::clone(&exec) as Arc) + .await + .unwrap_err() + .to_string(); + assert!( + output.contains("Compute error: This is the error"), + "actual output: {}", + output + ); + } + + struct TestResults { + output: Vec, + exec: Arc, + } + + impl TestResults { + /// return the number of duplicates this deduplicator detected + fn num_dupes(&self) -> usize { + self.exec + .metrics() + .get("numDuplicates") + .expect("No dupe metrics found") + .value() + } + } + + /// Run the input through the deduplicator and return results + async fn dedupe(input: Vec, sort_keys: Vec) -> TestResults { + test_helpers::maybe_start_logging(); + + // Setup in memory stream + let schema = input[0].schema(); + let projection = None; + let input = Arc::new(MemoryExec::try_new(&[input], schema, projection).unwrap()); + + // Create and run the deduplicator + let exec = Arc::new(DeduplicateExec::new(input, sort_keys)); + let output = collect(Arc::clone(&exec) as Arc) + .await + .unwrap(); + + TestResults { output, exec } + } + + /// A PhysicalPlan that sends a specific set of + /// Result for testing. + #[derive(Debug)] + struct DummyExec { + schema: SchemaRef, + batches: Vec>, + } + + #[async_trait] + impl ExecutionPlan for DummyExec { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn output_partitioning(&self) -> Partitioning { + unimplemented!(); + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + &self, + _children: Vec>, + ) -> Result> { + unimplemented!() + } + + async fn execute(&self, partition: usize) -> Result { + assert_eq!(partition, 0); + + // ensure there is space to queue up the channel + let (tx, rx) = mpsc::channel(self.batches.len()); + + // queue up all the results + for r in &self.batches { + match r { + Ok(batch) => tx.send(Ok(batch.clone())).await.unwrap(), + Err(e) => tx.send(Err(clone_error(e))).await.unwrap(), + } + } + + let stream = AdapterStream { + schema: self.schema(), + inner: ReceiverStream::new(rx), + }; + Ok(Box::pin(stream)) + } + } + + fn clone_error(e: &ArrowError) -> ArrowError { + use ArrowError::*; + match e { + ComputeError(msg) => ComputeError(msg.to_string()), + _ => unimplemented!(), + } + } +} diff --git a/query/src/provider/deduplicate/algo.rs b/query/src/provider/deduplicate/algo.rs new file mode 100644 index 0000000000..4dcab6939e --- /dev/null +++ b/query/src/provider/deduplicate/algo.rs @@ -0,0 +1,247 @@ +//! Implementation of Deduplication algorithm + +use std::{ops::Range, sync::Arc}; + +use arrow::{ + array::{ArrayRef, UInt64Array}, + compute::TakeOptions, + error::Result as ArrowResult, + record_batch::RecordBatch, +}; + +use datafusion::physical_plan::{ + coalesce_batches::concat_batches, expressions::PhysicalSortExpr, PhysicalExpr, SQLMetric, +}; +use observability_deps::tracing::debug; + +// Handles the deduplication across potentially multiple +// [`RecordBatch`]es which are already sorted on a primary key, +// including primary keys which straddle RecordBatch boundaries +#[derive(Debug)] +pub(crate) struct RecordBatchDeduplicator { + sort_keys: Vec, + last_batch: Option, + num_dupes: Arc, +} + +#[derive(Debug)] +struct DuplicateRanges { + /// `is_sort_key[col_idx] = true` if the the input column at + /// `col_idx` is present in sort keys + is_sort_key: Vec, + + /// ranges of row indices where the sort key columns have the + /// same values + ranges: Vec>, +} + +impl RecordBatchDeduplicator { + pub fn new(sort_keys: Vec, num_dupes: Arc) -> Self { + Self { + sort_keys, + last_batch: None, + num_dupes, + } + } + + /// Push a new RecordBatch into the indexer. Returns a + /// deduplicated RecordBatch and remembers any currently opened + /// groups + pub fn push(&mut self, batch: RecordBatch) -> ArrowResult { + // If we had a previous batch of rows, add it in here + // + // Potential optimization would be to check if the sort key is actually the same + // for the first row in the new batch and skip this concat if that is the case + let batch = if let Some(last_batch) = self.last_batch.take() { + let schema = last_batch.schema(); + let row_count = last_batch.num_rows() + batch.num_rows(); + concat_batches(&schema, &[last_batch, batch], row_count)? + } else { + batch + }; + + let mut dupe_ranges = self.compute_ranges(&batch)?; + + // The last partition may span batches so we can't emit it + // until we have seen the next batch (or we are at end of + // stream) + let last_range = dupe_ranges.ranges.pop(); + + let output_record_batch = self.output_from_ranges(&batch, &dupe_ranges)?; + + // Now, save the last bit of the pk + if let Some(last_range) = last_range { + let len = last_range.end - last_range.start; + let last_batch = Self::slice_record_batch(&batch, last_range.start, len)?; + self.last_batch = Some(last_batch); + } + + Ok(output_record_batch) + } + + /// Consume the indexer, returning any remaining record batches for output + pub fn finish(mut self) -> ArrowResult> { + self.last_batch + .take() + .map(|last_batch| { + let dupe_ranges = self.compute_ranges(&last_batch)?; + self.output_from_ranges(&last_batch, &dupe_ranges) + }) + .transpose() + } + + /// Computes the ranges where the sort key has the same values + fn compute_ranges(&self, batch: &RecordBatch) -> ArrowResult { + let schema = batch.schema(); + // is_sort_key[col_idx] = true if it is present in sort keys + let mut is_sort_key: Vec = vec![false; batch.columns().len()]; + + // Figure out where the partitions are: + let columns: Vec<_> = self + .sort_keys + .iter() + .map(|skey| { + // figure out what input column this is for + let name = get_col_name(skey.expr.as_ref()); + let index = schema.index_of(name).unwrap(); + + is_sort_key[index] = true; + + let array = batch.column(index); + + arrow::compute::SortColumn { + values: Arc::clone(array), + options: Some(skey.options), + } + }) + .collect(); + + // Compute partitions (aka breakpoints between the ranges) + let ranges = arrow::compute::lexicographical_partition_ranges(&columns)?; + + Ok(DuplicateRanges { + is_sort_key, + ranges, + }) + } + + /// Compute the output record batch that includes the specified ranges + fn output_from_ranges( + &self, + batch: &RecordBatch, + dupe_ranges: &DuplicateRanges, + ) -> ArrowResult { + let ranges = &dupe_ranges.ranges; + + // each range is at least 1 large, so any that have more than + // 1 are duplicates + let num_dupes = ranges.iter().map(|r| r.end - r.start - 1).sum(); + + self.num_dupes.add(num_dupes); + + // Special case when no ranges are duplicated (so just emit input as output) + if num_dupes == 0 { + debug!(num_rows = batch.num_rows(), "No dupes"); + Self::slice_record_batch(&batch, 0, ranges.len()) + } else { + debug!(num_dupes, num_rows = batch.num_rows(), "dupes"); + + // Use take kernel + let sort_key_indices = self.compute_sort_key_indices(&ranges); + + let take_options = Some(TakeOptions { + check_bounds: false, + }); + + // Form each new column by `take`ing the indices as needed + let new_columns = batch + .columns() + .iter() + .enumerate() + .map(|(input_index, input_array)| { + if dupe_ranges.is_sort_key[input_index] { + arrow::compute::take( + input_array.as_ref(), + &sort_key_indices, + take_options.clone(), + ) + } else { + // pick the last non null value + let field_indices = self.compute_field_indices(&ranges, input_array); + + arrow::compute::take( + input_array.as_ref(), + &field_indices, + take_options.clone(), + ) + } + }) + .collect::>>()?; + RecordBatch::try_new(batch.schema(), new_columns) + } + } + + /// Returns an array of indices, one for each input range (which + /// index is arbitrary as all the values are the same for the sort + /// column in each pk group) + /// + /// ranges: 0-1, 2-4, 5-6 --> Array[0, 2, 5] + fn compute_sort_key_indices(&self, ranges: &[Range]) -> UInt64Array { + ranges.iter().map(|r| Some(r.start as u64)).collect() + } + + /// Returns an array of indices, one for each input range that + /// return the first non-null value of `input_array` in that range + /// (aka it will pick the index of the field value to use for each + /// pk group) + /// + /// ranges: 0-1, 2-4, 5-6 + /// input array: A, NULL, NULL, C, NULL, NULL + /// --> Array[0, 3, 5] + fn compute_field_indices( + &self, + ranges: &[Range], + input_array: &ArrayRef, + ) -> UInt64Array { + ranges + .iter() + .map(|r| { + let value_index = r + .clone() + .filter(|&i| input_array.is_valid(i)) + .last() + .map(|i| i as u64) + // if all field values are none, pick one arbitrarily + .unwrap_or(r.start as u64); + Some(value_index) + }) + .collect() + } + + /// Create a new record batch from offset --> len + /// + /// https://github.com/apache/arrow-rs/issues/460 for adding this upstream + fn slice_record_batch( + batch: &RecordBatch, + offset: usize, + len: usize, + ) -> ArrowResult { + let schema = batch.schema(); + let new_columns: Vec<_> = batch + .columns() + .iter() + .map(|old_column| old_column.slice(offset, len)) + .collect(); + + RecordBatch::try_new(schema, new_columns) + } +} + +/// Get column name out of the `expr`. TODO use +/// internal_types::schema::SortKey instead. +fn get_col_name(expr: &dyn PhysicalExpr) -> &str { + expr.as_any() + .downcast_ref::() + .expect("expected column reference") + .name() +}