test: Add test to verify deduplicating is working (#3937)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
e09f39d6a0
commit
9d8bceccbf
|
@ -340,7 +340,7 @@ mod test {
|
|||
use arrow::compute::SortOptions;
|
||||
use arrow::datatypes::{Int32Type, SchemaRef};
|
||||
use arrow::{
|
||||
array::{ArrayRef, Float64Array, StringArray},
|
||||
array::{ArrayRef, Float64Array, StringArray, TimestampNanosecondArray},
|
||||
record_batch::RecordBatch,
|
||||
};
|
||||
use arrow_util::assert_batches_eq;
|
||||
|
@ -348,7 +348,7 @@ mod test {
|
|||
use datafusion_util::test_collect;
|
||||
|
||||
use super::*;
|
||||
use arrow::array::DictionaryArray;
|
||||
use arrow::array::{DictionaryArray, Int64Array};
|
||||
use std::iter::FromIterator;
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -429,6 +429,51 @@ mod test {
|
|||
assert_batches_eq!(&expected, &results.output);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_with_timestamp() {
|
||||
// input:
|
||||
// f1 | f2 | time
|
||||
// ---+----+------
|
||||
// 1 | | 100
|
||||
// | 3 | 100
|
||||
//
|
||||
// expected output:
|
||||
//
|
||||
// f1 | f2 | time
|
||||
// ---+----+-------
|
||||
// 1 | 3 | 100
|
||||
let f1 = Float64Array::from(vec![Some(1.0), None]);
|
||||
let f2 = Float64Array::from(vec![None, Some(3.0)]);
|
||||
|
||||
let time = TimestampNanosecondArray::from(vec![Some(100), Some(100)]);
|
||||
|
||||
let batch = RecordBatch::try_from_iter(vec![
|
||||
("f1", Arc::new(f1) as ArrayRef),
|
||||
("f2", Arc::new(f2) as ArrayRef),
|
||||
("time", Arc::new(time) as ArrayRef),
|
||||
])
|
||||
.unwrap();
|
||||
|
||||
let sort_keys = vec![PhysicalSortExpr {
|
||||
expr: col("time", &batch.schema()).unwrap(),
|
||||
options: SortOptions {
|
||||
descending: false,
|
||||
nulls_first: false,
|
||||
},
|
||||
}];
|
||||
|
||||
let results = dedupe(vec![batch], sort_keys).await;
|
||||
|
||||
let expected = vec![
|
||||
"+----+----+--------------------------------+",
|
||||
"| f1 | f2 | time |",
|
||||
"+----+----+--------------------------------+",
|
||||
"| 1 | 3 | 1970-01-01T00:00:00.000000100Z |",
|
||||
"+----+----+--------------------------------+",
|
||||
];
|
||||
assert_batches_eq!(&expected, &results.output);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multi_tag() {
|
||||
// input:
|
||||
|
@ -563,6 +608,105 @@ mod test {
|
|||
assert_batches_eq!(&expected, &results.output);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_string_with_timestamp() {
|
||||
// input:
|
||||
// s | i | time
|
||||
// -------+----+------
|
||||
// "cat" | | 100
|
||||
// | 3 | 100
|
||||
// | 4 | 200
|
||||
// "dog" | | 200
|
||||
//
|
||||
// expected output:
|
||||
//
|
||||
// s | i | time
|
||||
// -------+----+-------
|
||||
// "cat" | 3 | 100
|
||||
// "dog" | 4 | 200
|
||||
let s = StringArray::from(vec![Some("cat"), None, None, Some("dog")]);
|
||||
|
||||
let i = Int64Array::from(vec![None, Some(3), Some(4), None]);
|
||||
|
||||
let time = TimestampNanosecondArray::from(vec![Some(100), Some(100), Some(200), Some(200)]);
|
||||
|
||||
let batch = RecordBatch::try_from_iter(vec![
|
||||
("s", Arc::new(s) as ArrayRef),
|
||||
("i", Arc::new(i) as ArrayRef),
|
||||
("time", Arc::new(time) as ArrayRef),
|
||||
])
|
||||
.unwrap();
|
||||
|
||||
let sort_keys = vec![PhysicalSortExpr {
|
||||
expr: col("time", &batch.schema()).unwrap(),
|
||||
options: SortOptions {
|
||||
descending: false,
|
||||
nulls_first: false,
|
||||
},
|
||||
}];
|
||||
|
||||
let results = dedupe(vec![batch], sort_keys).await;
|
||||
|
||||
let expected = vec![
|
||||
"+-----+---+--------------------------------+",
|
||||
"| s | i | time |",
|
||||
"+-----+---+--------------------------------+",
|
||||
"| cat | 3 | 1970-01-01T00:00:00.000000100Z |",
|
||||
"| dog | 4 | 1970-01-01T00:00:00.000000200Z |",
|
||||
"+-----+---+--------------------------------+",
|
||||
];
|
||||
assert_batches_eq!(&expected, &results.output);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_last_is_null_with_timestamp() {
|
||||
// input:
|
||||
// s | i | time
|
||||
// -------+----+------
|
||||
// "cat" | | 1639612800000000000
|
||||
// | 10 | 1639612800000000000
|
||||
//
|
||||
// expected output:
|
||||
//
|
||||
// s | i | time
|
||||
// -------+----+-------
|
||||
// "cat" | 10 | 1639612800000000000
|
||||
let s = StringArray::from(vec![Some("cat"), None]);
|
||||
|
||||
let i = Int64Array::from(vec![None, Some(10)]);
|
||||
|
||||
let time = TimestampNanosecondArray::from(vec![
|
||||
Some(1639612800000000000),
|
||||
Some(1639612800000000000),
|
||||
]);
|
||||
|
||||
let batch = RecordBatch::try_from_iter(vec![
|
||||
("s", Arc::new(s) as ArrayRef),
|
||||
("i", Arc::new(i) as ArrayRef),
|
||||
("time", Arc::new(time) as ArrayRef),
|
||||
])
|
||||
.unwrap();
|
||||
|
||||
let sort_keys = vec![PhysicalSortExpr {
|
||||
expr: col("time", &batch.schema()).unwrap(),
|
||||
options: SortOptions {
|
||||
descending: false,
|
||||
nulls_first: true,
|
||||
},
|
||||
}];
|
||||
|
||||
let results = dedupe(vec![batch], sort_keys).await;
|
||||
|
||||
let expected = vec![
|
||||
"+-----+----+----------------------+",
|
||||
"| s | i | time |",
|
||||
"+-----+----+----------------------+",
|
||||
"| cat | 10 | 2021-12-16T00:00:00Z |",
|
||||
"+-----+----+----------------------+",
|
||||
];
|
||||
assert_batches_eq!(&expected, &results.output);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multi_record_batch() {
|
||||
// input:
|
||||
|
|
Loading…
Reference in New Issue