diff --git a/iox_query/src/frontend/reorg.rs b/iox_query/src/frontend/reorg.rs index 1b1ce23c2d..34c63d8b74 100644 --- a/iox_query/src/frontend/reorg.rs +++ b/iox_query/src/frontend/reorg.rs @@ -270,6 +270,96 @@ mod test { (schema, vec![chunk1, chunk2]) } + async fn get_sorted_test_chunks() -> (Arc, Vec>) { + // Chunk 1 + let chunk1 = Arc::new( + TestChunk::new("t") + .with_time_column_with_stats(Some(1000), Some(1000)) + .with_tag_column_with_stats("tag1", Some("A"), Some("A")) + .with_i64_field_column("field_int") + .with_one_row_of_specific_data("A", 1, 1000), + ) as Arc; + + let expected = vec![ + "+-----------+------+-----------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-----------------------------+", + "| 1 | A | 1970-01-01T00:00:00.000001Z |", + "+-----------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected, &raw_data(&[Arc::clone(&chunk1)]).await); + + // Chunk 2 + let chunk2 = Arc::new( + TestChunk::new("t") + .with_time_column_with_stats(Some(2000), Some(2000)) + .with_tag_column_with_stats("tag1", Some("B"), Some("B")) + .with_i64_field_column("field_int") + .with_one_row_of_specific_data("B", 2, 2000), + ) as Arc; + + let expected = vec![ + "+-----------+------+-----------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-----------------------------+", + "| 2 | B | 1970-01-01T00:00:00.000002Z |", + "+-----------+------+-----------------------------+", + ]; + assert_batches_eq!(&expected, &raw_data(&[Arc::clone(&chunk2)]).await); + + (chunk1.schema(), vec![chunk1, chunk2]) + } + + #[tokio::test] + async fn test_compact_plan_sorted() { + test_helpers::maybe_start_logging(); + + // ensures that the output is actually sorted + // https://github.com/influxdata/influxdb_iox/issues/6125 + let (schema, chunks) = get_sorted_test_chunks().await; + + let chunk_orders = vec![ + // reverse order + vec![Arc::clone(&chunks[1]), Arc::clone(&chunks[0])], + chunks, + ]; + + // executor has only 1 thread + let executor = Executor::new(1); + for chunks in chunk_orders { + let sort_key = SortKeyBuilder::with_capacity(2) + .with_col_opts("tag1", false, true) + .with_col_opts(TIME_COLUMN_NAME, false, true) + .build(); + + let compact_plan = ReorgPlanner::new(IOxSessionContext::with_testing()) + .compact_plan(Arc::from("t"), Arc::clone(&schema), chunks, sort_key) + .expect("created compact plan"); + + let physical_plan = executor + .new_context(ExecutorType::Reorg) + .create_physical_plan(&compact_plan) + .await + .unwrap(); + + let batches = test_collect(physical_plan).await; + + // should be sorted on tag1 then timestamp + let expected = vec![ + "+-----------+------+-----------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-----------------------------+", + "| 1 | A | 1970-01-01T00:00:00.000001Z |", + "| 2 | B | 1970-01-01T00:00:00.000002Z |", + "+-----------+------+-----------------------------+", + ]; + + assert_batches_eq!(&expected, &batches); + } + + executor.join().await; + } + #[tokio::test] async fn test_compact_plan() { test_helpers::maybe_start_logging(); diff --git a/iox_query/src/test.rs b/iox_query/src/test.rs index 6ecdc32291..b189f0565a 100644 --- a/iox_query/src/test.rs +++ b/iox_query/src/test.rs @@ -591,6 +591,7 @@ impl TestChunk { /// Prepares this chunk to return a specific record batch with one /// row of non null data. + /// tag: MA pub fn with_one_row_of_data(mut self) -> Self { // create arrays let columns = self @@ -623,6 +624,44 @@ impl TestChunk { self } + /// Prepares this chunk to return a specific record batch with a single tag, field and timestamp like + pub fn with_one_row_of_specific_data( + mut self, + tag_val: impl AsRef, + field_val: i64, + ts_val: i64, + ) -> Self { + // create arrays + let columns = self + .schema + .iter() + .map(|(_influxdb_column_type, field)| match field.data_type() { + DataType::Int64 => Arc::new(Int64Array::from(vec![field_val])) as ArrayRef, + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + Arc::new(TimestampNanosecondArray::from(vec![ts_val])) as ArrayRef + } + DataType::Dictionary(key, value) + if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 => + { + let dict: DictionaryArray = + vec![tag_val.as_ref()].into_iter().collect(); + Arc::new(dict) as ArrayRef + } + _ => unimplemented!( + "Unimplemented data type for test database: {:?}", + field.data_type() + ), + }) + .collect::>(); + + let batch = + RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch"); + println!("TestChunk batch data: {:#?}", batch); + + self.table_data.push(Arc::new(batch)); + self + } + /// Prepares this chunk to return a specific record batch with three /// rows of non null data that look like, no duplicates within /// "+------+------+-----------+-------------------------------+", diff --git a/query_tests/cases/in/duplicates_parquet_many.expected b/query_tests/cases/in/duplicates_parquet_many.expected new file mode 100644 index 0000000000..3de03d3c29 --- /dev/null +++ b/query_tests/cases/in/duplicates_parquet_many.expected @@ -0,0 +1,38 @@ +-- Test Setup: TwentySortedParquetFiles +-- SQL: select count(*), sum(f) from m; ++-----------------+----------+ +| COUNT(UInt8(1)) | SUM(m.f) | ++-----------------+----------+ +| 21 | 33 | ++-----------------+----------+ +-- SQL: EXPLAIN select count(*), sum(f) from m; +-- Results After Normalizing UUIDs ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: COUNT(UInt8(1)), SUM(m.f) | +| | Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1)), SUM(m.f)]] | +| | TableScan: m projection=[f] | +| physical_plan | ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1)), SUM(m.f)@1 as SUM(m.f)] | +| | AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1)), SUM(m.f)] | +| | CoalescePartitionsExec | +| | AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1)), SUM(m.f)] | +| | UnionExec | +| | ProjectionExec: expr=[f@0 as f] | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | DeduplicateExec: [tag@1 ASC,time@2 ASC] | +| | SortPreservingMergeExec: [tag@1 ASC,time@2 ASC] | +| | UnionExec | +| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet], projection=[f, tag, time] | +| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet], projection=[f, tag, time] | +| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet], projection=[f, tag, time] | +| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000003.parquet], projection=[f, tag, time] | +| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000004.parquet], projection=[f, tag, time] | +| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000005.parquet], projection=[f, tag, time] | +| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000006.parquet], projection=[f, tag, time] | +| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000007.parquet], projection=[f, tag, time] | +| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000008.parquet], projection=[f, tag, time] | +| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-000000000009.parquet], projection=[f, tag, time] | +| | ParquetExec: limit=None, partitions=[1/1/1/1/00000000-0000-0000-0000-00000000000a.parquet, 1/1/1/1/00000000-0000-0000-0000-00000000000b.parquet, 1/1/1/1/00000000-0000-0000-0000-00000000000c.parquet, 1/1/1/1/00000000-0000-0000-0000-00000000000d.parquet, 1/1/1/1/00000000-0000-0000-0000-00000000000e.parquet, 1/1/1/1/00000000-0000-0000-0000-00000000000f.parquet, 1/1/1/1/00000000-0000-0000-0000-000000000010.parquet, 1/1/1/1/00000000-0000-0000-0000-000000000011.parquet, 1/1/1/1/00000000-0000-0000-0000-000000000012.parquet, 1/1/1/1/00000000-0000-0000-0000-000000000013.parquet], projection=[f] | +| | | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ diff --git a/query_tests/cases/in/duplicates_parquet_many.sql b/query_tests/cases/in/duplicates_parquet_many.sql new file mode 100644 index 0000000000..8c064350f0 --- /dev/null +++ b/query_tests/cases/in/duplicates_parquet_many.sql @@ -0,0 +1,11 @@ +-- Test setup for running with 100 parquet files +-- IOX_SETUP: TwentySortedParquetFiles + + +-- each parquet file has either 2 rows, one with f=1 and the other with f=2 +-- and then there are 50 that have a single row with f=3 +select count(*), sum(f) from m; + +-- Use sum to avoid count(*) otimization +-- IOX_COMPARE: uuid +EXPLAIN select count(*), sum(f) from m; diff --git a/query_tests/src/cases.rs b/query_tests/src/cases.rs index 69caf0dfe5..89bb5032ee 100644 --- a/query_tests/src/cases.rs +++ b/query_tests/src/cases.rs @@ -1,7 +1,8 @@ + //! This file is auto generated by query_tests/generate. //! Do not edit manually --> will result in sadness -use crate::runner::Runner; use std::path::Path; +use crate::runner::Runner; #[tokio::test] // Tests from "basic.sql", @@ -10,8 +11,13 @@ async fn test_cases_basic_sql() { let input_path = Path::new("cases").join("in").join("basic.sql"); let mut runner = Runner::new(); - runner.run(input_path).await.expect("test failed"); - runner.flush().expect("flush worked"); + runner + .run(input_path) + .await + .expect("test failed"); + runner + .flush() + .expect("flush worked"); } #[tokio::test] @@ -19,12 +25,15 @@ async fn test_cases_basic_sql() { async fn test_cases_duplicates_ingester_sql() { test_helpers::maybe_start_logging(); - let input_path = Path::new("cases") - .join("in") - .join("duplicates_ingester.sql"); + let input_path = Path::new("cases").join("in").join("duplicates_ingester.sql"); let mut runner = Runner::new(); - runner.run(input_path).await.expect("test failed"); - runner.flush().expect("flush worked"); + runner + .run(input_path) + .await + .expect("test failed"); + runner + .flush() + .expect("flush worked"); } #[tokio::test] @@ -34,8 +43,29 @@ async fn test_cases_duplicates_parquet_sql() { let input_path = Path::new("cases").join("in").join("duplicates_parquet.sql"); let mut runner = Runner::new(); - runner.run(input_path).await.expect("test failed"); - runner.flush().expect("flush worked"); + runner + .run(input_path) + .await + .expect("test failed"); + runner + .flush() + .expect("flush worked"); +} + +#[tokio::test] +// Tests from "duplicates_parquet_many.sql", +async fn test_cases_duplicates_parquet_many_sql() { + test_helpers::maybe_start_logging(); + + let input_path = Path::new("cases").join("in").join("duplicates_parquet_many.sql"); + let mut runner = Runner::new(); + runner + .run(input_path) + .await + .expect("test failed"); + runner + .flush() + .expect("flush worked"); } #[tokio::test] @@ -43,12 +73,15 @@ async fn test_cases_duplicates_parquet_sql() { async fn test_cases_new_sql_system_tables_sql() { test_helpers::maybe_start_logging(); - let input_path = Path::new("cases") - .join("in") - .join("new_sql_system_tables.sql"); + let input_path = Path::new("cases").join("in").join("new_sql_system_tables.sql"); let mut runner = Runner::new(); - runner.run(input_path).await.expect("test failed"); - runner.flush().expect("flush worked"); + runner + .run(input_path) + .await + .expect("test failed"); + runner + .flush() + .expect("flush worked"); } #[tokio::test] @@ -58,8 +91,13 @@ async fn test_cases_pushdown_sql() { let input_path = Path::new("cases").join("in").join("pushdown.sql"); let mut runner = Runner::new(); - runner.run(input_path).await.expect("test failed"); - runner.flush().expect("flush worked"); + runner + .run(input_path) + .await + .expect("test failed"); + runner + .flush() + .expect("flush worked"); } #[tokio::test] @@ -69,8 +107,13 @@ async fn test_cases_selectors_sql() { let input_path = Path::new("cases").join("in").join("selectors.sql"); let mut runner = Runner::new(); - runner.run(input_path).await.expect("test failed"); - runner.flush().expect("flush worked"); + runner + .run(input_path) + .await + .expect("test failed"); + runner + .flush() + .expect("flush worked"); } #[tokio::test] @@ -80,8 +123,13 @@ async fn test_cases_several_chunks_sql() { let input_path = Path::new("cases").join("in").join("several_chunks.sql"); let mut runner = Runner::new(); - runner.run(input_path).await.expect("test failed"); - runner.flush().expect("flush worked"); + runner + .run(input_path) + .await + .expect("test failed"); + runner + .flush() + .expect("flush worked"); } #[tokio::test] @@ -89,12 +137,15 @@ async fn test_cases_several_chunks_sql() { async fn test_cases_sql_information_schema_sql() { test_helpers::maybe_start_logging(); - let input_path = Path::new("cases") - .join("in") - .join("sql_information_schema.sql"); + let input_path = Path::new("cases").join("in").join("sql_information_schema.sql"); let mut runner = Runner::new(); - runner.run(input_path).await.expect("test failed"); - runner.flush().expect("flush worked"); + runner + .run(input_path) + .await + .expect("test failed"); + runner + .flush() + .expect("flush worked"); } #[tokio::test] @@ -104,8 +155,13 @@ async fn test_cases_timestamps_sql() { let input_path = Path::new("cases").join("in").join("timestamps.sql"); let mut runner = Runner::new(); - runner.run(input_path).await.expect("test failed"); - runner.flush().expect("flush worked"); + runner + .run(input_path) + .await + .expect("test failed"); + runner + .flush() + .expect("flush worked"); } #[tokio::test] @@ -115,8 +171,13 @@ async fn test_cases_two_chunks_sql() { let input_path = Path::new("cases").join("in").join("two_chunks.sql"); let mut runner = Runner::new(); - runner.run(input_path).await.expect("test failed"); - runner.flush().expect("flush worked"); + runner + .run(input_path) + .await + .expect("test failed"); + runner + .flush() + .expect("flush worked"); } #[tokio::test] @@ -124,10 +185,13 @@ async fn test_cases_two_chunks_sql() { async fn test_cases_two_chunks_missing_columns_sql() { test_helpers::maybe_start_logging(); - let input_path = Path::new("cases") - .join("in") - .join("two_chunks_missing_columns.sql"); + let input_path = Path::new("cases").join("in").join("two_chunks_missing_columns.sql"); let mut runner = Runner::new(); - runner.run(input_path).await.expect("test failed"); - runner.flush().expect("flush worked"); -} + runner + .run(input_path) + .await + .expect("test failed"); + runner + .flush() + .expect("flush worked"); +} \ No newline at end of file diff --git a/query_tests/src/scenarios.rs b/query_tests/src/scenarios.rs index d36511e804..a5604c58d9 100644 --- a/query_tests/src/scenarios.rs +++ b/query_tests/src/scenarios.rs @@ -55,6 +55,7 @@ pub fn get_all_setups() -> &'static HashMap> { register_setup!(OneMeasurementFourChunksWithDuplicates), register_setup!(OneMeasurementFourChunksWithDuplicatesParquetOnly), register_setup!(OneMeasurementFourChunksWithDuplicatesWithIngester), + register_setup!(TwentySortedParquetFiles), register_setup!(ThreeDeleteThreeChunks), register_setup!(OneDeleteSimpleExprOneChunkDeleteAll), register_setup!(OneDeleteSimpleExprOneChunk), diff --git a/query_tests/src/scenarios/library.rs b/query_tests/src/scenarios/library.rs index 8f4ba31ea9..31ce57eb6b 100644 --- a/query_tests/src/scenarios/library.rs +++ b/query_tests/src/scenarios/library.rs @@ -534,6 +534,44 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicatesWithIngester { } } +/// Setup with 20 parquet files, some with duplicated and some without +/// duplicated tags. The idea here is to verify that merging them +/// together produces the correct values +#[derive(Debug)] +pub struct TwentySortedParquetFiles {} +#[async_trait] +impl DbSetup for TwentySortedParquetFiles { + async fn make(&self) -> Vec { + let lp_data: Vec<_> = (0..20) + .map(|i| { + if i % 2 == 0 { + vec![ + format!("m,tag=A f=1 {}", 1000 - i), // unique in this chunk + format!("m,tab=B f=2 {}", 1000 - i), // unique in this chunk (not plus i!) + ] + } else { + vec![ + format!("m,tag=A f=3 2001"), // duplicated across all chunks + ] + } + }) + .collect(); + + let partition_key = "1970-01-01T00"; + let chunk_data: Vec<_> = lp_data + .iter() + .map(|lp_lines| ChunkData { + lp_lines: lp_lines.iter().map(|s| s.as_str()).collect(), + partition_key, + chunk_stage: Some(ChunkStage::Parquet), + ..Default::default() + }) + .collect(); + + make_n_chunks_scenario(&chunk_data).await + } +} + #[derive(Debug)] pub struct OneMeasurementManyFields {} #[async_trait] diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index f28efa823d..60617f5614 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -47,7 +47,7 @@ use trace::{ // Structs, enums, and functions used to exhaust all test scenarios of chunk lifecycle // & when delete predicates are applied -// STRUCTs & ENUMs +/// Describes a Chunk that should be created for a test #[derive(Debug, Clone, Default)] pub struct ChunkData<'a, 'b> { /// Line protocol data of this chunk