From 9d8bceccbf93d0614ef918af552ab76ce5417185 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 4 Mar 2022 15:05:17 -0500 Subject: [PATCH] test: Add test to verify deduplicating is working (#3937) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- query/src/provider/deduplicate.rs | 148 +++++++++++++++++++++++++++++- 1 file changed, 146 insertions(+), 2 deletions(-) diff --git a/query/src/provider/deduplicate.rs b/query/src/provider/deduplicate.rs index 0147d336fa..6fb224fd12 100644 --- a/query/src/provider/deduplicate.rs +++ b/query/src/provider/deduplicate.rs @@ -340,7 +340,7 @@ mod test { use arrow::compute::SortOptions; use arrow::datatypes::{Int32Type, SchemaRef}; use arrow::{ - array::{ArrayRef, Float64Array, StringArray}, + array::{ArrayRef, Float64Array, StringArray, TimestampNanosecondArray}, record_batch::RecordBatch, }; use arrow_util::assert_batches_eq; @@ -348,7 +348,7 @@ mod test { use datafusion_util::test_collect; use super::*; - use arrow::array::DictionaryArray; + use arrow::array::{DictionaryArray, Int64Array}; use std::iter::FromIterator; #[tokio::test] @@ -429,6 +429,51 @@ mod test { assert_batches_eq!(&expected, &results.output); } + #[tokio::test] + async fn test_with_timestamp() { + // input: + // f1 | f2 | time + // ---+----+------ + // 1 | | 100 + // | 3 | 100 + // + // expected output: + // + // f1 | f2 | time + // ---+----+------- + // 1 | 3 | 100 + let f1 = Float64Array::from(vec![Some(1.0), None]); + let f2 = Float64Array::from(vec![None, Some(3.0)]); + + let time = TimestampNanosecondArray::from(vec![Some(100), Some(100)]); + + let batch = RecordBatch::try_from_iter(vec![ + ("f1", Arc::new(f1) as ArrayRef), + ("f2", Arc::new(f2) as ArrayRef), + ("time", Arc::new(time) as ArrayRef), + ]) + .unwrap(); + + let sort_keys = vec![PhysicalSortExpr { + expr: col("time", &batch.schema()).unwrap(), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]; + + let results = dedupe(vec![batch], sort_keys).await; + + let expected = vec![ + "+----+----+--------------------------------+", + "| f1 | f2 | time |", + "+----+----+--------------------------------+", + "| 1 | 3 | 1970-01-01T00:00:00.000000100Z |", + "+----+----+--------------------------------+", + ]; + assert_batches_eq!(&expected, &results.output); + } + #[tokio::test] async fn test_multi_tag() { // input: @@ -563,6 +608,105 @@ mod test { assert_batches_eq!(&expected, &results.output); } + #[tokio::test] + async fn test_string_with_timestamp() { + // input: + // s | i | time + // -------+----+------ + // "cat" | | 100 + // | 3 | 100 + // | 4 | 200 + // "dog" | | 200 + // + // expected output: + // + // s | i | time + // -------+----+------- + // "cat" | 3 | 100 + // "dog" | 4 | 200 + let s = StringArray::from(vec![Some("cat"), None, None, Some("dog")]); + + let i = Int64Array::from(vec![None, Some(3), Some(4), None]); + + let time = TimestampNanosecondArray::from(vec![Some(100), Some(100), Some(200), Some(200)]); + + let batch = RecordBatch::try_from_iter(vec![ + ("s", Arc::new(s) as ArrayRef), + ("i", Arc::new(i) as ArrayRef), + ("time", Arc::new(time) as ArrayRef), + ]) + .unwrap(); + + let sort_keys = vec![PhysicalSortExpr { + expr: col("time", &batch.schema()).unwrap(), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]; + + let results = dedupe(vec![batch], sort_keys).await; + + let expected = vec![ + "+-----+---+--------------------------------+", + "| s | i | time |", + "+-----+---+--------------------------------+", + "| cat | 3 | 1970-01-01T00:00:00.000000100Z |", + "| dog | 4 | 1970-01-01T00:00:00.000000200Z |", + "+-----+---+--------------------------------+", + ]; + assert_batches_eq!(&expected, &results.output); + } + + #[tokio::test] + async fn test_last_is_null_with_timestamp() { + // input: + // s | i | time + // -------+----+------ + // "cat" | | 1639612800000000000 + // | 10 | 1639612800000000000 + // + // expected output: + // + // s | i | time + // -------+----+------- + // "cat" | 10 | 1639612800000000000 + let s = StringArray::from(vec![Some("cat"), None]); + + let i = Int64Array::from(vec![None, Some(10)]); + + let time = TimestampNanosecondArray::from(vec![ + Some(1639612800000000000), + Some(1639612800000000000), + ]); + + let batch = RecordBatch::try_from_iter(vec![ + ("s", Arc::new(s) as ArrayRef), + ("i", Arc::new(i) as ArrayRef), + ("time", Arc::new(time) as ArrayRef), + ]) + .unwrap(); + + let sort_keys = vec![PhysicalSortExpr { + expr: col("time", &batch.schema()).unwrap(), + options: SortOptions { + descending: false, + nulls_first: true, + }, + }]; + + let results = dedupe(vec![batch], sort_keys).await; + + let expected = vec![ + "+-----+----+----------------------+", + "| s | i | time |", + "+-----+----+----------------------+", + "| cat | 10 | 2021-12-16T00:00:00Z |", + "+-----+----+----------------------+", + ]; + assert_batches_eq!(&expected, &results.output); + } + #[tokio::test] async fn test_multi_record_batch() { // input: