test: tests in the reorg planner and query tests for merging parquet files (#6137)

* test: tests in the reorg planner and query tests for merging parquet files

* fix: use 20 files

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-11-15 15:29:44 -05:00 committed by GitHub
parent 2a9d330ef3
commit 20f1ae1c8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 318 additions and 37 deletions

View File

@ -270,6 +270,96 @@ mod test {
(schema, vec![chunk1, chunk2])
}
async fn get_sorted_test_chunks() -> (Arc<Schema>, Vec<Arc<dyn QueryChunk>>) {
// Chunk 1
let chunk1 = Arc::new(
TestChunk::new("t")
.with_time_column_with_stats(Some(1000), Some(1000))
.with_tag_column_with_stats("tag1", Some("A"), Some("A"))
.with_i64_field_column("field_int")
.with_one_row_of_specific_data("A", 1, 1000),
) as Arc<dyn QueryChunk>;
let expected = vec![
"+-----------+------+-----------------------------+",
"| field_int | tag1 | time |",
"+-----------+------+-----------------------------+",
"| 1 | A | 1970-01-01T00:00:00.000001Z |",
"+-----------+------+-----------------------------+",
];
assert_batches_eq!(&expected, &raw_data(&[Arc::clone(&chunk1)]).await);
// Chunk 2
let chunk2 = Arc::new(
TestChunk::new("t")
.with_time_column_with_stats(Some(2000), Some(2000))
.with_tag_column_with_stats("tag1", Some("B"), Some("B"))
.with_i64_field_column("field_int")
.with_one_row_of_specific_data("B", 2, 2000),
) as Arc<dyn QueryChunk>;
let expected = vec![
"+-----------+------+-----------------------------+",
"| field_int | tag1 | time |",
"+-----------+------+-----------------------------+",
"| 2 | B | 1970-01-01T00:00:00.000002Z |",
"+-----------+------+-----------------------------+",
];
assert_batches_eq!(&expected, &raw_data(&[Arc::clone(&chunk2)]).await);
(chunk1.schema(), vec![chunk1, chunk2])
}
#[tokio::test]
async fn test_compact_plan_sorted() {
test_helpers::maybe_start_logging();
// ensures that the output is actually sorted
// https://github.com/influxdata/influxdb_iox/issues/6125
let (schema, chunks) = get_sorted_test_chunks().await;
let chunk_orders = vec![
// reverse order
vec![Arc::clone(&chunks[1]), Arc::clone(&chunks[0])],
chunks,
];
// executor has only 1 thread
let executor = Executor::new(1);
for chunks in chunk_orders {
let sort_key = SortKeyBuilder::with_capacity(2)
.with_col_opts("tag1", false, true)
.with_col_opts(TIME_COLUMN_NAME, false, true)
.build();
let compact_plan = ReorgPlanner::new(IOxSessionContext::with_testing())
.compact_plan(Arc::from("t"), Arc::clone(&schema), chunks, sort_key)
.expect("created compact plan");
let physical_plan = executor
.new_context(ExecutorType::Reorg)
.create_physical_plan(&compact_plan)
.await
.unwrap();
let batches = test_collect(physical_plan).await;
// should be sorted on tag1 then timestamp
let expected = vec![
"+-----------+------+-----------------------------+",
"| field_int | tag1 | time |",
"+-----------+------+-----------------------------+",
"| 1 | A | 1970-01-01T00:00:00.000001Z |",
"| 2 | B | 1970-01-01T00:00:00.000002Z |",
"+-----------+------+-----------------------------+",
];
assert_batches_eq!(&expected, &batches);
}
executor.join().await;
}
#[tokio::test]
async fn test_compact_plan() {
test_helpers::maybe_start_logging();

View File

@ -591,6 +591,7 @@ impl TestChunk {
/// Prepares this chunk to return a specific record batch with one
/// row of non null data.
/// tag: MA
pub fn with_one_row_of_data(mut self) -> Self {
// create arrays
let columns = self
@ -623,6 +624,44 @@ impl TestChunk {
self
}
/// Prepares this chunk to return a specific record batch with a single tag, field and timestamp like
pub fn with_one_row_of_specific_data(
mut self,
tag_val: impl AsRef<str>,
field_val: i64,
ts_val: i64,
) -> Self {
// create arrays
let columns = self
.schema
.iter()
.map(|(_influxdb_column_type, field)| match field.data_type() {
DataType::Int64 => Arc::new(Int64Array::from(vec![field_val])) as ArrayRef,
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
Arc::new(TimestampNanosecondArray::from(vec![ts_val])) as ArrayRef
}
DataType::Dictionary(key, value)
if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 =>
{
let dict: DictionaryArray<Int32Type> =
vec![tag_val.as_ref()].into_iter().collect();
Arc::new(dict) as ArrayRef
}
_ => unimplemented!(
"Unimplemented data type for test database: {:?}",
field.data_type()
),
})
.collect::<Vec<_>>();
let batch =
RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch");
println!("TestChunk batch data: {:#?}", batch);
self.table_data.push(Arc::new(batch));
self
}
/// Prepares this chunk to return a specific record batch with three
/// rows of non null data that look like, no duplicates within
/// "+------+------+-----------+-------------------------------+",

View File

@ -0,0 +1,38 @@
-- Test Setup: TwentySortedParquetFiles
-- SQL: select count(*), sum(f) from m;
+-----------------+----------+
| COUNT(UInt8(1)) | SUM(m.f) |
+-----------------+----------+
| 21 | 33 |
+-----------------+----------+
-- SQL: EXPLAIN select count(*), sum(f) from m;
-- Results After Normalizing UUIDs
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: COUNT(UInt8(1)), SUM(m.f) |
| | Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)), SUM(m.f)]] |
| | TableScan: m projection=[f] |
| physical_plan | ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1)), SUM(m.f)@1 as SUM(m.f)] |
| | AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1)), SUM(m.f)] |
| | CoalescePartitionsExec |
| | AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1)), SUM(m.f)] |
| | UnionExec |
| | ProjectionExec: expr=[f@0 as f] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [tag@1 ASC,time@2 ASC] |
| | SortPreservingMergeExec: [tag@1 ASC,time@2 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], projection=[f, tag, time] |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet], projection=[f, tag, time] |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet], projection=[f, tag, time] |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000003.parquet], projection=[f, tag, time] |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000004.parquet], projection=[f, tag, time] |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000005.parquet], projection=[f, tag, time] |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000006.parquet], projection=[f, tag, time] |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000007.parquet], projection=[f, tag, time] |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000008.parquet], projection=[f, tag, time] |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000009.parquet], projection=[f, tag, time] |
| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-00000000000a.parquet, 1/1/1/1/00000000-0000-0000-0000-00000000000b.parquet, 1/1/1/1/00000000-0000-0000-0000-00000000000c.parquet, 1/1/1/1/00000000-0000-0000-0000-00000000000d.parquet, 1/1/1/1/00000000-0000-0000-0000-00000000000e.parquet, 1/1/1/1/00000000-0000-0000-0000-00000000000f.parquet, 1/1/1/1/00000000-0000-0000-0000-000000000010.parquet, 1/1/1/1/00000000-0000-0000-0000-000000000011.parquet, 1/1/1/1/00000000-0000-0000-0000-000000000012.parquet, 1/1/1/1/00000000-0000-0000-0000-000000000013.parquet], projection=[f] |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@ -0,0 +1,11 @@
-- Test setup for running with 100 parquet files
-- IOX_SETUP: TwentySortedParquetFiles
-- each parquet file has either 2 rows, one with f=1 and the other with f=2
-- and then there are 50 that have a single row with f=3
select count(*), sum(f) from m;
-- Use sum to avoid count(*) otimization
-- IOX_COMPARE: uuid
EXPLAIN select count(*), sum(f) from m;

View File

@ -1,7 +1,8 @@
//! This file is auto generated by query_tests/generate.
//! Do not edit manually --> will result in sadness
use crate::runner::Runner;
use std::path::Path;
use crate::runner::Runner;
#[tokio::test]
// Tests from "basic.sql",
@ -10,8 +11,13 @@ async fn test_cases_basic_sql() {
let input_path = Path::new("cases").join("in").join("basic.sql");
let mut runner = Runner::new();
runner.run(input_path).await.expect("test failed");
runner.flush().expect("flush worked");
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
@ -19,12 +25,15 @@ async fn test_cases_basic_sql() {
async fn test_cases_duplicates_ingester_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases")
.join("in")
.join("duplicates_ingester.sql");
let input_path = Path::new("cases").join("in").join("duplicates_ingester.sql");
let mut runner = Runner::new();
runner.run(input_path).await.expect("test failed");
runner.flush().expect("flush worked");
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
@ -34,8 +43,29 @@ async fn test_cases_duplicates_parquet_sql() {
let input_path = Path::new("cases").join("in").join("duplicates_parquet.sql");
let mut runner = Runner::new();
runner.run(input_path).await.expect("test failed");
runner.flush().expect("flush worked");
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
// Tests from "duplicates_parquet_many.sql",
async fn test_cases_duplicates_parquet_many_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases").join("in").join("duplicates_parquet_many.sql");
let mut runner = Runner::new();
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
@ -43,12 +73,15 @@ async fn test_cases_duplicates_parquet_sql() {
async fn test_cases_new_sql_system_tables_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases")
.join("in")
.join("new_sql_system_tables.sql");
let input_path = Path::new("cases").join("in").join("new_sql_system_tables.sql");
let mut runner = Runner::new();
runner.run(input_path).await.expect("test failed");
runner.flush().expect("flush worked");
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
@ -58,8 +91,13 @@ async fn test_cases_pushdown_sql() {
let input_path = Path::new("cases").join("in").join("pushdown.sql");
let mut runner = Runner::new();
runner.run(input_path).await.expect("test failed");
runner.flush().expect("flush worked");
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
@ -69,8 +107,13 @@ async fn test_cases_selectors_sql() {
let input_path = Path::new("cases").join("in").join("selectors.sql");
let mut runner = Runner::new();
runner.run(input_path).await.expect("test failed");
runner.flush().expect("flush worked");
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
@ -80,8 +123,13 @@ async fn test_cases_several_chunks_sql() {
let input_path = Path::new("cases").join("in").join("several_chunks.sql");
let mut runner = Runner::new();
runner.run(input_path).await.expect("test failed");
runner.flush().expect("flush worked");
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
@ -89,12 +137,15 @@ async fn test_cases_several_chunks_sql() {
async fn test_cases_sql_information_schema_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases")
.join("in")
.join("sql_information_schema.sql");
let input_path = Path::new("cases").join("in").join("sql_information_schema.sql");
let mut runner = Runner::new();
runner.run(input_path).await.expect("test failed");
runner.flush().expect("flush worked");
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
@ -104,8 +155,13 @@ async fn test_cases_timestamps_sql() {
let input_path = Path::new("cases").join("in").join("timestamps.sql");
let mut runner = Runner::new();
runner.run(input_path).await.expect("test failed");
runner.flush().expect("flush worked");
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
@ -115,8 +171,13 @@ async fn test_cases_two_chunks_sql() {
let input_path = Path::new("cases").join("in").join("two_chunks.sql");
let mut runner = Runner::new();
runner.run(input_path).await.expect("test failed");
runner.flush().expect("flush worked");
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
@ -124,10 +185,13 @@ async fn test_cases_two_chunks_sql() {
async fn test_cases_two_chunks_missing_columns_sql() {
test_helpers::maybe_start_logging();
let input_path = Path::new("cases")
.join("in")
.join("two_chunks_missing_columns.sql");
let input_path = Path::new("cases").join("in").join("two_chunks_missing_columns.sql");
let mut runner = Runner::new();
runner.run(input_path).await.expect("test failed");
runner.flush().expect("flush worked");
}
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}

View File

@ -55,6 +55,7 @@ pub fn get_all_setups() -> &'static HashMap<String, Arc<dyn DbSetup>> {
register_setup!(OneMeasurementFourChunksWithDuplicates),
register_setup!(OneMeasurementFourChunksWithDuplicatesParquetOnly),
register_setup!(OneMeasurementFourChunksWithDuplicatesWithIngester),
register_setup!(TwentySortedParquetFiles),
register_setup!(ThreeDeleteThreeChunks),
register_setup!(OneDeleteSimpleExprOneChunkDeleteAll),
register_setup!(OneDeleteSimpleExprOneChunk),

View File

@ -534,6 +534,44 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicatesWithIngester {
}
}
/// Setup with 20 parquet files, some with duplicated and some without
/// duplicated tags. The idea here is to verify that merging them
/// together produces the correct values
#[derive(Debug)]
pub struct TwentySortedParquetFiles {}
#[async_trait]
impl DbSetup for TwentySortedParquetFiles {
async fn make(&self) -> Vec<DbScenario> {
let lp_data: Vec<_> = (0..20)
.map(|i| {
if i % 2 == 0 {
vec![
format!("m,tag=A f=1 {}", 1000 - i), // unique in this chunk
format!("m,tab=B f=2 {}", 1000 - i), // unique in this chunk (not plus i!)
]
} else {
vec![
format!("m,tag=A f=3 2001"), // duplicated across all chunks
]
}
})
.collect();
let partition_key = "1970-01-01T00";
let chunk_data: Vec<_> = lp_data
.iter()
.map(|lp_lines| ChunkData {
lp_lines: lp_lines.iter().map(|s| s.as_str()).collect(),
partition_key,
chunk_stage: Some(ChunkStage::Parquet),
..Default::default()
})
.collect();
make_n_chunks_scenario(&chunk_data).await
}
}
#[derive(Debug)]
pub struct OneMeasurementManyFields {}
#[async_trait]

View File

@ -47,7 +47,7 @@ use trace::{
// Structs, enums, and functions used to exhaust all test scenarios of chunk lifecycle
// & when delete predicates are applied
// STRUCTs & ENUMs
/// Describes a Chunk that should be created for a test
#[derive(Debug, Clone, Default)]
pub struct ChunkData<'a, 'b> {
/// Line protocol data of this chunk