diff --git a/query_tests/cases/in/chunk_order.expected b/query_tests/cases/in/chunk_order.expected deleted file mode 100644 index 4c35e0b7a9..0000000000 --- a/query_tests/cases/in/chunk_order.expected +++ /dev/null @@ -1,7 +0,0 @@ --- Test Setup: ChunkOrder --- SQL: SELECT * from cpu order by time; -+--------+--------------------------------+------+ -| region | time | user | -+--------+--------------------------------+------+ -| west | 1970-01-01T00:00:00.000000100Z | 2 | -+--------+--------------------------------+------+ diff --git a/query_tests/cases/in/chunk_order.sql b/query_tests/cases/in/chunk_order.sql deleted file mode 100644 index 45384762dc..0000000000 --- a/query_tests/cases/in/chunk_order.sql +++ /dev/null @@ -1,5 +0,0 @@ --- Test that deduplication respects chunk ordering --- IOX_SETUP: ChunkOrder - --- query data -SELECT * from cpu order by time; diff --git a/query_tests/cases/in/old_duplicates.expected b/query_tests/cases/in/duplicates.expected similarity index 99% rename from query_tests/cases/in/old_duplicates.expected rename to query_tests/cases/in/duplicates.expected index 4d30b8cc77..6a276e1b4c 100644 --- a/query_tests/cases/in/old_duplicates.expected +++ b/query_tests/cases/in/duplicates.expected @@ -1,4 +1,4 @@ --- Test Setup: OldOneMeasurementFourChunksWithDuplicates +-- Test Setup: OneMeasurementFourChunksWithDuplicates -- SQL: explain select time, state, city, min_temp, max_temp, area from h2o order by time, state, city; +---------------+---------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | diff --git a/query_tests/cases/in/old_duplicates.sql b/query_tests/cases/in/duplicates.sql similarity index 86% rename from query_tests/cases/in/old_duplicates.sql rename to query_tests/cases/in/duplicates.sql index ad7054ce94..a7da9a28de 100644 --- a/query_tests/cases/in/old_duplicates.sql +++ b/query_tests/cases/in/duplicates.sql @@ -1,5 +1,5 @@ -- Test for predicate push down explains --- IOX_SETUP: OldOneMeasurementFourChunksWithDuplicates +-- IOX_SETUP: OneMeasurementFourChunksWithDuplicates -- Plan with order by explain select time, state, city, min_temp, max_temp, area from h2o order by time, state, city; diff --git a/query_tests/cases/in/no_stats_plans.expected b/query_tests/cases/in/no_stats_plans.expected deleted file mode 100644 index 6897aa59ae..0000000000 --- a/query_tests/cases/in/no_stats_plans.expected +++ /dev/null @@ -1,13 +0,0 @@ --- Test Setup: OldTwoMeasurementsManyFieldsOneRubChunk --- SQL: EXPLAIN SELECT count(*) from h2o; -+---------------+-------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------+ -| logical_plan | Projection: #COUNT(UInt8(1)) | -| | Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] | -| | TableScan: h2o projection=Some([0]) | -| physical_plan | ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))] | -| | ProjectionExec: expr=[3 as COUNT(UInt8(1))] | -| | EmptyExec: produce_one_row=true | -| | | -+---------------+-------------------------------------------------------------+ diff --git a/query_tests/cases/in/no_stats_plans.sql b/query_tests/cases/in/no_stats_plans.sql deleted file mode 100644 index 0fddf57798..0000000000 --- a/query_tests/cases/in/no_stats_plans.sql +++ /dev/null @@ -1,10 +0,0 @@ --- Demonstrate plans that are not optimized using statistics --- IOX_SETUP: OldTwoMeasurementsManyFieldsOneRubChunk - --- This plan should scan data as it reads from chunk with delete predicates -EXPLAIN SELECT count(*) from h2o; - --- NOTE: This test should have "IOX_SETUP: ThreeDeleteThreeChunks" but becasue of Bug: https://github.com/influxdata/influxdb_iox/issues/2745 --- make it OldTwoMeasurementsManyFieldsOneRubChunk --- Also, the query should be "EXPLAIN SELECT count(*) from cpu;" - diff --git a/query_tests/cases/in/old_sql_system_tables.expected b/query_tests/cases/in/old_sql_system_tables.expected deleted file mode 100644 index 47153dfecb..0000000000 --- a/query_tests/cases/in/old_sql_system_tables.expected +++ /dev/null @@ -1,37 +0,0 @@ --- Test Setup: OldTwoMeasurementsManyFieldsOneRubChunk --- SQL: SELECT * from information_schema.tables where table_schema = 'system'; --- Results After Sorting -+---------------+--------------+---------------------+------------+ -| table_catalog | table_schema | table_name | table_type | -+---------------+--------------+---------------------+------------+ -| public | system | chunk_columns | BASE TABLE | -| public | system | chunks | BASE TABLE | -| public | system | columns | BASE TABLE | -| public | system | operations | BASE TABLE | -| public | system | persistence_windows | BASE TABLE | -| public | system | queries | BASE TABLE | -+---------------+--------------+---------------------+------------+ --- SQL: SELECT partition_key, table_name, storage, memory_bytes, row_count from system.chunks; --- Results After Sorting -+---------------+------------+-------------------+--------------+-----------+ -| partition_key | table_name | storage | memory_bytes | row_count | -+---------------+------------+-------------------+--------------+-----------+ -| 1970-01-01T00 | h2o | ReadBuffer | 4456 | 3 | -| 1970-01-01T00 | o2 | OpenMutableBuffer | 1867 | 2 | -+---------------+------------+-------------------+--------------+-----------+ --- SQL: SELECT * from system.columns; --- Results After Sorting -+---------------+------------+-------------+-------------+---------------+ -| partition_key | table_name | column_name | column_type | influxdb_type | -+---------------+------------+-------------+-------------+---------------+ -| 1970-01-01T00 | h2o | city | String | Tag | -| 1970-01-01T00 | h2o | other_temp | F64 | Field | -| 1970-01-01T00 | h2o | state | String | Tag | -| 1970-01-01T00 | h2o | temp | F64 | Field | -| 1970-01-01T00 | h2o | time | I64 | Timestamp | -| 1970-01-01T00 | o2 | city | String | Tag | -| 1970-01-01T00 | o2 | reading | F64 | Field | -| 1970-01-01T00 | o2 | state | String | Tag | -| 1970-01-01T00 | o2 | temp | F64 | Field | -| 1970-01-01T00 | o2 | time | I64 | Timestamp | -+---------------+------------+-------------+-------------+---------------+ diff --git a/query_tests/cases/in/old_sql_system_tables.sql b/query_tests/cases/in/old_sql_system_tables.sql deleted file mode 100644 index 68a17f0d69..0000000000 --- a/query_tests/cases/in/old_sql_system_tables.sql +++ /dev/null @@ -1,19 +0,0 @@ --- IOX_SETUP: OldTwoMeasurementsManyFieldsOneRubChunk - --- --- system tables reflect the state of chunks, so don't run them --- with different chunk configurations. --- - --- validate we have access to information schema for listing system tables --- IOX_COMPARE: sorted -SELECT * from information_schema.tables where table_schema = 'system'; - --- ensures the tables / plumbing are hooked up (so no need to --- test timestamps, etc) --- IOX_COMPARE: sorted -SELECT partition_key, table_name, storage, memory_bytes, row_count from system.chunks; - - --- IOX_COMPARE: sorted -SELECT * from system.columns; diff --git a/query_tests/cases/in/old_sql_system_tables2.expected b/query_tests/cases/in/old_sql_system_tables2.expected deleted file mode 100644 index 0f095baf1e..0000000000 --- a/query_tests/cases/in/old_sql_system_tables2.expected +++ /dev/null @@ -1,21 +0,0 @@ --- Test Setup: OldTwoMeasurementsManyFieldsTwoChunks --- SQL: SELECT partition_key, table_name, column_name, storage, row_count, null_count, min_value, max_value, memory_bytes from system.chunk_columns; --- Results After Sorting -+---------------+------------+-------------+-------------------+-----------+------------+-----------+-----------+--------------+ -| partition_key | table_name | column_name | storage | row_count | null_count | min_value | max_value | memory_bytes | -+---------------+------------+-------------+-------------------+-----------+------------+-----------+-----------+--------------+ -| 1970-01-01T00 | h2o | city | OpenMutableBuffer | 1 | 0 | Boston | Boston | 317 | -| 1970-01-01T00 | h2o | city | ReadBuffer | 2 | 0 | Boston | Boston | 327 | -| 1970-01-01T00 | h2o | other_temp | OpenMutableBuffer | 1 | 0 | 72.4 | 72.4 | 305 | -| 1970-01-01T00 | h2o | other_temp | ReadBuffer | 2 | 1 | 70.4 | 70.4 | 471 | -| 1970-01-01T00 | h2o | state | OpenMutableBuffer | 1 | 0 | CA | CA | 317 | -| 1970-01-01T00 | h2o | state | ReadBuffer | 2 | 0 | MA | MA | 315 | -| 1970-01-01T00 | h2o | temp | ReadBuffer | 2 | 1 | 70.4 | 70.4 | 471 | -| 1970-01-01T00 | h2o | time | OpenMutableBuffer | 1 | 0 | 350 | 350 | 305 | -| 1970-01-01T00 | h2o | time | ReadBuffer | 2 | 0 | 50 | 250 | 110 | -| 1970-01-01T00 | o2 | city | OpenMutableBuffer | 2 | 1 | Boston | Boston | 317 | -| 1970-01-01T00 | o2 | reading | OpenMutableBuffer | 2 | 1 | 51 | 51 | 305 | -| 1970-01-01T00 | o2 | state | OpenMutableBuffer | 2 | 0 | CA | MA | 321 | -| 1970-01-01T00 | o2 | temp | OpenMutableBuffer | 2 | 0 | 53.4 | 79 | 305 | -| 1970-01-01T00 | o2 | time | OpenMutableBuffer | 2 | 0 | 50 | 300 | 305 | -+---------------+------------+-------------+-------------------+-----------+------------+-----------+-----------+--------------+ diff --git a/query_tests/cases/in/old_sql_system_tables2.sql b/query_tests/cases/in/old_sql_system_tables2.sql deleted file mode 100644 index 8d283d91ad..0000000000 --- a/query_tests/cases/in/old_sql_system_tables2.sql +++ /dev/null @@ -1,12 +0,0 @@ --- IOX_SETUP: OldTwoMeasurementsManyFieldsTwoChunks - --- --- system tables reflect the state of chunks, so don't run them --- with different chunk configurations. --- - - --- ensures the tables / plumbing are hooked up (so no need to --- test timestamps, etc) --- IOX_COMPARE: sorted -SELECT partition_key, table_name, column_name, storage, row_count, null_count, min_value, max_value, memory_bytes from system.chunk_columns; diff --git a/query_tests/cases/in/old_stats_plans.expected b/query_tests/cases/in/old_stats_plans.expected deleted file mode 100644 index 627c134402..0000000000 --- a/query_tests/cases/in/old_stats_plans.expected +++ /dev/null @@ -1,31 +0,0 @@ --- Test Setup: OldTwoMeasurementsManyFieldsOneRubChunk --- SQL: EXPLAIN SELECT count(*) from h2o; -+---------------+-------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------+ -| logical_plan | Projection: #COUNT(UInt8(1)) | -| | Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] | -| | TableScan: h2o projection=Some([0]) | -| physical_plan | ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))] | -| | ProjectionExec: expr=[3 as COUNT(UInt8(1))] | -| | EmptyExec: produce_one_row=true | -| | | -+---------------+-------------------------------------------------------------+ --- SQL: EXPLAIN SELECT count(*) from h2o where temp > 70.0 and temp < 72.0; -+---------------+---------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: #COUNT(UInt8(1)) | -| | Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] | -| | Filter: #h2o.temp > Float64(70) AND #h2o.temp < Float64(72) | -| | TableScan: h2o projection=Some([3]), partial_filters=[#h2o.temp > Float64(70), #h2o.temp < Float64(72)] | -| physical_plan | ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))] | -| | HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))] | -| | CoalescePartitionsExec | -| | HashAggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))] | -| | CoalesceBatchesExec: target_batch_size=500 | -| | FilterExec: temp@0 > 70 AND temp@0 < 72 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate exprs: [#temp > Float64(70), #temp < Float64(72)] | -| | | -+---------------+---------------------------------------------------------------------------------------------------------------------------------+ diff --git a/query_tests/cases/in/old_stats_plans.sql b/query_tests/cases/in/old_stats_plans.sql deleted file mode 100644 index f6b4c63d50..0000000000 --- a/query_tests/cases/in/old_stats_plans.sql +++ /dev/null @@ -1,9 +0,0 @@ --- Demonstrate plans that are optimized using statistics --- IOX_SETUP: OldTwoMeasurementsManyFieldsOneRubChunk - --- This plan should not scan data as it reads from a RUB chunk (no duplicate) and no soft deleted data -EXPLAIN SELECT count(*) from h2o; - --- However, this plan will still need to scan data given the predicate -EXPLAIN SELECT count(*) from h2o where temp > 70.0 and temp < 72.0; - diff --git a/query_tests/src/cases.rs b/query_tests/src/cases.rs index e8e6c48506..31fdaa49d5 100644 --- a/query_tests/src/cases.rs +++ b/query_tests/src/cases.rs @@ -32,20 +32,6 @@ async fn test_cases_basic_sql() { .expect("flush worked"); } -#[tokio::test] -// Tests from "chunk_order.sql", -async fn test_cases_chunk_order_sql() { - let input_path = Path::new("cases").join("in").join("chunk_order.sql"); - let mut runner = Runner::new(); - runner - .run(input_path) - .await - .expect("test failed"); - runner - .flush() - .expect("flush worked"); -} - #[tokio::test] // Tests from "delete_all.sql", async fn test_cases_delete_all_sql() { @@ -131,65 +117,9 @@ async fn test_cases_new_sql_system_tables_sql() { } #[tokio::test] -// Tests from "no_stats_plans.sql", -async fn test_cases_no_stats_plans_sql() { - let input_path = Path::new("cases").join("in").join("no_stats_plans.sql"); - let mut runner = Runner::new(); - runner - .run(input_path) - .await - .expect("test failed"); - runner - .flush() - .expect("flush worked"); -} - -#[tokio::test] -// Tests from "old_duplicates.sql", -async fn test_cases_old_duplicates_sql() { - let input_path = Path::new("cases").join("in").join("old_duplicates.sql"); - let mut runner = Runner::new(); - runner - .run(input_path) - .await - .expect("test failed"); - runner - .flush() - .expect("flush worked"); -} - -#[tokio::test] -// Tests from "old_sql_system_tables.sql", -async fn test_cases_old_sql_system_tables_sql() { - let input_path = Path::new("cases").join("in").join("old_sql_system_tables.sql"); - let mut runner = Runner::new(); - runner - .run(input_path) - .await - .expect("test failed"); - runner - .flush() - .expect("flush worked"); -} - -#[tokio::test] -// Tests from "old_sql_system_tables2.sql", -async fn test_cases_old_sql_system_tables2_sql() { - let input_path = Path::new("cases").join("in").join("old_sql_system_tables2.sql"); - let mut runner = Runner::new(); - runner - .run(input_path) - .await - .expect("test failed"); - runner - .flush() - .expect("flush worked"); -} - -#[tokio::test] -// Tests from "old_stats_plans.sql", -async fn test_cases_old_stats_plans_sql() { - let input_path = Path::new("cases").join("in").join("old_stats_plans.sql"); +// Tests from "duplicates.sql", +async fn test_cases_duplicates_sql() { + let input_path = Path::new("cases").join("in").join("duplicates.sql"); let mut runner = Runner::new(); runner .run(input_path) diff --git a/query_tests/src/scenarios.rs b/query_tests/src/scenarios.rs index c4934b2f5e..e1506e3e3c 100644 --- a/query_tests/src/scenarios.rs +++ b/query_tests/src/scenarios.rs @@ -52,12 +52,8 @@ pub fn get_all_setups() -> &'static HashMap> { register_setup!(TwoMeasurementsManyFields), register_setup!(TwoMeasurementsPredicatePushDown), register_setup!(TwoMeasurementsManyFieldsOneChunk), - register_setup!(OldTwoMeasurementsManyFieldsTwoChunks), - register_setup!(OldTwoMeasurementsManyFieldsOneRubChunk), register_setup!(OneMeasurementFourChunksWithDuplicates), - register_setup!(OldOneMeasurementFourChunksWithDuplicates), register_setup!(OneMeasurementAllChunksDropped), - register_setup!(ChunkOrder), register_setup!(ThreeDeleteThreeChunks), register_setup!(OneDeleteSimpleExprOneChunkDeleteAll), register_setup!(OneDeleteSimpleExprOneChunk), diff --git a/query_tests/src/scenarios/delete.rs b/query_tests/src/scenarios/delete.rs index 3347cb6d06..a6b480224f 100644 --- a/query_tests/src/scenarios/delete.rs +++ b/query_tests/src/scenarios/delete.rs @@ -7,11 +7,7 @@ use async_trait::async_trait; use super::util::{make_n_chunks_scenario_new, ChunkDataNew, DeleteTimeNew, PredNew}; use super::{DbScenario, DbSetup}; -use crate::scenarios::util::{ - all_scenarios_for_one_chunk, make_different_stage_chunks_with_deletes_scenario_old, - make_os_chunks_and_then_compact_with_different_scenarios_with_delete, ChunkDataOld, - ChunkStageOld, -}; +use crate::scenarios::util::all_scenarios_for_one_chunk; // ========================================================================================================================= // DELETE TEST SETUPS: chunk lp data, how many chunks, their types, how many delete predicates and when they happen @@ -255,225 +251,49 @@ impl DbSetup for ThreeDeleteThreeChunks { )], }; - // ---------------------- - // 3 chunks: MUB, RUB, OS - let lp = vec![ - ChunkDataOld { - lp_lines: lp_lines_1.clone(), - chunk_stage: ChunkStageOld::Os, + //let preds = vec![&pred1, &pred2, &pred3]; + let preds = vec![ + PredNew { + predicate: &pred1, + delete_time: DeleteTimeNew::End, }, - ChunkDataOld { - lp_lines: lp_lines_2.clone(), - chunk_stage: ChunkStageOld::Rub, + PredNew { + predicate: &pred2, + delete_time: DeleteTimeNew::End, }, - ChunkDataOld { - lp_lines: lp_lines_3.clone(), - chunk_stage: ChunkStageOld::Mubo, + PredNew { + predicate: &pred3, + delete_time: DeleteTimeNew::End, }, ]; - let preds = vec![&pred1, &pred2, &pred3]; - let scenario_mub_rub_os = make_different_stage_chunks_with_deletes_scenario_old( - lp, - preds.clone(), - table_name, - partition_key, - ) - .await; - // ---------------------- - // 3 chunks: 1 MUB open, 1 MUB frozen, 1 RUB - let lp = vec![ - ChunkDataOld { - lp_lines: lp_lines_1.clone(), - chunk_stage: ChunkStageOld::Rub, - }, - ChunkDataOld { - lp_lines: lp_lines_2.clone(), - chunk_stage: ChunkStageOld::Mubf, - }, - ChunkDataOld { - lp_lines: lp_lines_3.clone(), - chunk_stage: ChunkStageOld::Mubo, - }, - ]; - let scenario_2mub_rub = make_different_stage_chunks_with_deletes_scenario_old( - lp, - preds.clone(), - table_name, - partition_key, - ) - .await; - - // ---------------------- - // 3 chunks: 2 MUB, 1 OS - let lp = vec![ - ChunkDataOld { - lp_lines: lp_lines_1.clone(), - chunk_stage: ChunkStageOld::Os, - }, - ChunkDataOld { - lp_lines: lp_lines_2.clone(), - chunk_stage: ChunkStageOld::Mubf, - }, - ChunkDataOld { - lp_lines: lp_lines_3.clone(), - chunk_stage: ChunkStageOld::Mubo, - }, - ]; - let scenario_2mub_os = make_different_stage_chunks_with_deletes_scenario_old( - lp, - preds.clone(), - table_name, - partition_key, - ) - .await; - - // ---------------------- - // 3 chunks: 2 RUB, 1 OS - let lp = vec![ - ChunkDataOld { - lp_lines: lp_lines_1.clone(), - chunk_stage: ChunkStageOld::Os, - }, - ChunkDataOld { - lp_lines: lp_lines_2.clone(), - chunk_stage: ChunkStageOld::Rub, - }, - ChunkDataOld { - lp_lines: lp_lines_3.clone(), - chunk_stage: ChunkStageOld::Rub, - }, - ]; - let scenario_2rub_os = make_different_stage_chunks_with_deletes_scenario_old( - lp, - preds.clone(), - table_name, - partition_key, - ) - .await; - - // ---------------------- - // 3 chunks: RUB, 2 OS - let lp = vec![ - ChunkDataOld { - lp_lines: lp_lines_1.clone(), - chunk_stage: ChunkStageOld::Os, - }, - ChunkDataOld { - lp_lines: lp_lines_2.clone(), - chunk_stage: ChunkStageOld::Os, - }, - ChunkDataOld { - lp_lines: lp_lines_3.clone(), - chunk_stage: ChunkStageOld::Rub, - }, - ]; - let scenario_rub_2os = make_different_stage_chunks_with_deletes_scenario_old( - lp, - preds.clone(), - table_name, - partition_key, - ) - .await; - - // ---------------------- - // 3 chunks: 3 OS - let lp = vec![ - ChunkDataOld { - lp_lines: lp_lines_1.clone(), - chunk_stage: ChunkStageOld::Os, - }, - ChunkDataOld { - lp_lines: lp_lines_2.clone(), - chunk_stage: ChunkStageOld::Os, - }, - ChunkDataOld { - lp_lines: lp_lines_3.clone(), - chunk_stage: ChunkStageOld::Os, - }, - ]; - let scenario_3os = make_different_stage_chunks_with_deletes_scenario_old( - lp, - preds.clone(), - table_name, - partition_key, - ) - .await; - - // ---------------------- - // A few more scenarios to compact all 3 OS chunk or the fist 2 OS chunks - // with delete before or after the os_compaction - let compact_os_scenarios = - make_os_chunks_and_then_compact_with_different_scenarios_with_delete( - vec![lp_lines_1.clone(), lp_lines_2.clone(), lp_lines_3.clone()], - preds.clone(), - table_name, + // Scenarios + // All threee deletes will be applied to every chunk but due to their predicates, + // only appropriate data is deleted + let scenarios = make_n_chunks_scenario_new(&[ + ChunkDataNew { + lp_lines: lp_lines_1, + preds: preds.clone(), + delete_table_name: Some(table_name), partition_key, - ) - .await; - - // return scenarios to run queries - let mut scenarios = vec![ - scenario_mub_rub_os, - scenario_2mub_rub, - scenario_2mub_os, - scenario_2rub_os, - scenario_rub_2os, - scenario_3os, - ]; - - scenarios.extend(compact_os_scenarios.into_iter()); - scenarios.append( - &mut make_n_chunks_scenario_new(&[ - ChunkDataNew { - lp_lines: lp_lines_1, - preds: vec![ - PredNew { - predicate: &pred1, - delete_time: DeleteTimeNew::End, - }, - PredNew { - predicate: &pred2, - delete_time: DeleteTimeNew::End, - }, - PredNew { - predicate: &pred3, - delete_time: DeleteTimeNew::End, - }, - ], - delete_table_name: Some(table_name), - partition_key, - ..Default::default() - }, - ChunkDataNew { - lp_lines: lp_lines_2, - preds: vec![ - PredNew { - predicate: &pred2, - delete_time: DeleteTimeNew::End, - }, - PredNew { - predicate: &pred3, - delete_time: DeleteTimeNew::End, - }, - ], - delete_table_name: Some(table_name), - partition_key, - ..Default::default() - }, - ChunkDataNew { - lp_lines: lp_lines_3, - preds: vec![PredNew { - predicate: &pred3, - delete_time: DeleteTimeNew::End, - }], - delete_table_name: Some(table_name), - partition_key, - ..Default::default() - }, - ]) - .await, - ); + ..Default::default() + }, + ChunkDataNew { + lp_lines: lp_lines_2, + preds: preds.clone(), + delete_table_name: Some(table_name), + partition_key, + ..Default::default() + }, + ChunkDataNew { + lp_lines: lp_lines_3, + preds, + delete_table_name: Some(table_name), + partition_key, + ..Default::default() + }, + ]) + .await; scenarios } diff --git a/query_tests/src/scenarios/library.rs b/query_tests/src/scenarios/library.rs index e3cb1327dd..7e893a9cb4 100644 --- a/query_tests/src/scenarios/library.rs +++ b/query_tests/src/scenarios/library.rs @@ -9,100 +9,16 @@ use db::{ utils::{ count_mutable_buffer_chunks, count_object_store_chunks, count_read_buffer_chunks, make_db, }, - LockableChunk, LockablePartition, }; use query::{frontend::sql::SqlQueryPlanner, QueryChunk}; use crate::scenarios::util::{make_n_chunks_scenario_new, ChunkDataNew}; use super::{ - util::{ - all_scenarios_for_one_chunk, make_one_rub_or_parquet_chunk_scenario, - make_two_chunk_scenarios, ChunkStageNew, - }, + util::{all_scenarios_for_one_chunk, make_two_chunk_scenarios, ChunkStageNew}, DbScenario, DbSetup, }; -/// No data -#[derive(Debug)] -pub struct ChunkOrder {} -#[async_trait] -impl DbSetup for ChunkOrder { - async fn make(&self) -> Vec { - let partition_key = "1970-01-01T00"; - let table_name = "cpu"; - - let db = make_db().await.db; - - // create first chunk: data->MUB->RUB - write_lp(&db, "cpu,region=west user=1 100"); - assert_eq!(count_mutable_buffer_chunks(&db), 1); - assert_eq!(count_read_buffer_chunks(&db), 0); - assert_eq!(count_object_store_chunks(&db), 0); - db.compact_partition(table_name, partition_key) - .await - .unwrap(); - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 1); - assert_eq!(count_object_store_chunks(&db), 0); - - // We prepare a persist, then drop the locks, perform another write, re-acquire locks - // and start a persist operation. In practice the lifecycle doesn't drop the locks - // before starting the persist operation, but this allows us to deterministically - // interleave a persist with a write - let partition = db.lockable_partition(table_name, partition_key).unwrap(); - let (chunks, flush_handle) = { - let partition = partition.read(); - let chunks = LockablePartition::chunks(&partition); - let mut partition = partition.upgrade(); - let flush_handle = LockablePartition::prepare_persist(&mut partition, true).unwrap(); - - (chunks, flush_handle) - }; - - // create second chunk: data->MUB - write_lp(&db, "cpu,region=west user=2 100"); - assert_eq!(count_mutable_buffer_chunks(&db), 1); - assert_eq!(count_read_buffer_chunks(&db), 1); - assert_eq!(count_object_store_chunks(&db), 0); - - let tracker = { - let partition = partition.write(); - let chunks = chunks.iter().map(|chunk| chunk.write()).collect(); - LockablePartition::persist_chunks(partition, chunks, flush_handle).unwrap() - }; - - tracker.join().await; - assert!(tracker.get_status().result().unwrap().success()); - - assert_eq!(count_mutable_buffer_chunks(&db), 1); - assert_eq!(count_read_buffer_chunks(&db), 1); - assert_eq!(count_object_store_chunks(&db), 1); - - // Now we have the the following chunks (same partition and table): - // - // | ID | order | tag: region | field: user | time | - // | -- | ----- | ----------- | ----------- | ---- | - // | 1 | 1 | "west" | 2 | 100 | - // | 2 | 0 | "west" | 1 | 100 | - // - // The result after deduplication should be: - // - // | tag: region | field: user | time | - // | ----------- | ----------- | ---- | - // | "west" | 2 | 100 | - // - // So the query engine must use `order` as a primary key to sort chunks, NOT `id`. - - let scenario = DbScenario { - scenario_name: "chunks where chunk ID alone cannot be used for ordering".into(), - db, - }; - - vec![scenario] - } -} - #[derive(Debug)] pub struct MeasurementWithMaxTime {} #[async_trait] @@ -121,6 +37,8 @@ impl DbSetup for MeasurementWithMaxTime { } } +// TODO: rewrite this for NG. Need to see whether we will need to test empty DB and if so how +// /// No data #[derive(Debug)] pub struct NoData {} @@ -496,7 +414,7 @@ impl DbSetup for TwoMeasurementsPredicatePushDown { async fn make(&self) -> Vec { let partition_key = "1970-01-01T00"; - let lp_lines = vec![ + let lp_lines1 = vec![ "restaurant,town=andover count=40000u,system=5.0 100", "restaurant,town=reading count=632u,system=5.0 120", "restaurant,town=bedford count=189u,system=7.0 110", @@ -504,11 +422,13 @@ impl DbSetup for TwoMeasurementsPredicatePushDown { "restaurant,town=lexington count=372u,system=5.0 100", "restaurant,town=lawrence count=872u,system=6.0 110", "restaurant,town=reading count=632u,system=6.0 130", + ]; + let lp_lines2 = vec![ "school,town=reading count=17u,system=6.0 150", "school,town=andover count=25u,system=6.0 160", ]; - make_one_rub_or_parquet_chunk_scenario(partition_key, &lp_lines.join("\n")).await + make_two_chunk_scenarios(partition_key, &lp_lines1.join("\n"), &lp_lines2.join("\n")).await } } @@ -598,75 +518,6 @@ impl DbSetup for TwoMeasurementsManyFieldsOneChunk { } } -#[derive(Debug)] -/// This has a single chunk for queries that check the state of the system -/// -/// This scenario is OG-specific and can be used for `EXPLAIN` plans and system tables. -pub struct OldTwoMeasurementsManyFieldsOneRubChunk {} -#[async_trait] -impl DbSetup for OldTwoMeasurementsManyFieldsOneRubChunk { - async fn make(&self) -> Vec { - let db = make_db().await.db; - let partition_key = "1970-01-01T00"; - - let lp_lines = vec![ - "h2o,state=MA,city=Boston temp=70.4 50", - "h2o,state=MA,city=Boston other_temp=70.4 250", - "h2o,state=CA,city=Boston other_temp=72.4 350", - "o2,state=MA,city=Boston temp=53.4,reading=51 50", - "o2,state=CA temp=79.0 300", - ]; - - write_lp(&db, &lp_lines.join("\n")); - - // move all data to RUB - db.compact_open_chunk("h2o", partition_key).await.unwrap(); - - vec![DbScenario { - scenario_name: "Data in single chunk of read buffer".into(), - db, - }] - } -} - -#[derive(Debug)] -/// This has two chunks for queries that check the state of the system -/// -/// This scenario is OG-specific and can be used for `EXPLAIN` plans and system tables. -pub struct OldTwoMeasurementsManyFieldsTwoChunks {} -#[async_trait] -impl DbSetup for OldTwoMeasurementsManyFieldsTwoChunks { - async fn make(&self) -> Vec { - let db = make_db().await.db; - - let partition_key = "1970-01-01T00"; - - let lp_lines = vec![ - "h2o,state=MA,city=Boston temp=70.4 50", - "h2o,state=MA,city=Boston other_temp=70.4 250", - ]; - write_lp(&db, &lp_lines.join("\n")); - db.compact_partition("h2o", partition_key).await.unwrap(); - - let lp_lines = vec![ - "h2o,state=CA,city=Boston other_temp=72.4 350", - "o2,state=MA,city=Boston temp=53.4,reading=51 50", - "o2,state=CA temp=79.0 300", - ]; - write_lp(&db, &lp_lines.join("\n")); - - assert_eq!(count_mutable_buffer_chunks(&db), 2); - assert_eq!(count_read_buffer_chunks(&db), 1); - assert_eq!(count_object_store_chunks(&db), 0); - - vec![DbScenario { - scenario_name: "Data in two open mutable buffer chunks per table and read buffer" - .into(), - db, - }] - } -} - #[derive(Debug)] /// This has two chunks for queries that check the state of the system /// @@ -755,8 +606,6 @@ pub struct OneMeasurementFourChunksWithDuplicates {} #[async_trait] impl DbSetup for OneMeasurementFourChunksWithDuplicates { async fn make(&self) -> Vec { - let db = make_db().await.db; - let partition_key = "1970-01-01T00"; // Chunk 1: @@ -768,8 +617,6 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates { "h2o,state=MA,city=Boston max_temp=75.4 250", "h2o,state=MA,city=Andover max_temp=69.2, 250", ]; - write_lp(&db, &lp_lines1.join("\n")); - db.compact_open_chunk("h2o", partition_key).await.unwrap(); // Chunk 2: overlaps with chunk 1 // . time range: 150 - 300 @@ -782,8 +629,6 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates { "h2o,state=CA,city=SJ min_temp=78.5,max_temp=88.0 300", "h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 350", ]; - write_lp(&db, &lp_lines2.join("\n")); - db.compact_open_chunk("h2o", partition_key).await.unwrap(); // Chunk 3: no overlap // . time range: 400 - 500 @@ -796,8 +641,6 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates { "h2o,state=CA,city=SJ min_temp=77.0,max_temp=90.7 450", "h2o,state=CA,city=SJ min_temp=69.5,max_temp=88.2 500", ]; - write_lp(&db, &lp_lines3.join("\n")); - db.compact_open_chunk("h2o", partition_key).await.unwrap(); // Chunk 4: no overlap // . time range: 600 - 700 @@ -810,109 +653,35 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates { "h2o,state=CA,city=SJ min_temp=69.5,max_temp=89.2 650", "h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 700", ]; - write_lp(&db, &lp_lines4.join("\n")); - db.compact_open_chunk("h2o", partition_key).await.unwrap(); - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 4); - assert_eq!(count_object_store_chunks(&db), 0); + let scenarios = make_n_chunks_scenario_new(&[ + ChunkDataNew { + lp_lines: lp_lines1, + partition_key, + ..Default::default() + }, + ChunkDataNew { + lp_lines: lp_lines2, + partition_key, + ..Default::default() + }, + ChunkDataNew { + lp_lines: lp_lines3, + partition_key, + ..Default::default() + }, + ChunkDataNew { + lp_lines: lp_lines4, + partition_key, + ..Default::default() + }, + ]) + .await; - let mut scenarios = vec![DbScenario { - scenario_name: "Data in four chunks with duplicates".into(), - db, - }]; - scenarios.append( - &mut make_n_chunks_scenario_new(&[ - ChunkDataNew { - lp_lines: lp_lines1, - partition_key, - ..Default::default() - }, - ChunkDataNew { - lp_lines: lp_lines2, - partition_key, - ..Default::default() - }, - ChunkDataNew { - lp_lines: lp_lines3, - partition_key, - ..Default::default() - }, - ChunkDataNew { - lp_lines: lp_lines4, - partition_key, - ..Default::default() - }, - ]) - .await, - ); scenarios } } -#[derive(Debug)] -/// Setup for four chunks with duplicates for deduplicate tests -/// -/// This scenario is OG-specific and can be used for `EXPLAIN` plans and system tables. -pub struct OldOneMeasurementFourChunksWithDuplicates {} -#[async_trait] -impl DbSetup for OldOneMeasurementFourChunksWithDuplicates { - async fn make(&self) -> Vec { - let scenarios: Vec<_> = OneMeasurementFourChunksWithDuplicates {} - .make() - .await - .into_iter() - .filter(|s| s.scenario_name == "Data in four chunks with duplicates") - .collect(); - assert_eq!(scenarios.len(), 1); - scenarios - } -} - -#[derive(Debug)] -/// This has a single scenario with all the life cycle operations to -/// test queries that depend on that -/// -/// This scenario is OG-specific and can be used for `EXPLAIN` plans and system tables. -pub struct OldTwoMeasurementsManyFieldsLifecycle {} -#[async_trait] -impl DbSetup for OldTwoMeasurementsManyFieldsLifecycle { - async fn make(&self) -> Vec { - let partition_key = "1970-01-01T00"; - - let db = make_db().await.db; - - write_lp( - &db, - &vec![ - "h2o,state=MA,city=Boston temp=70.4 50", - "h2o,state=MA,city=Boston other_temp=70.4 250", - ] - .join("\n"), - ); - - db.compact_open_chunk("h2o", partition_key).await.unwrap(); - - db.persist_partition("h2o", partition_key, true) - .await - .unwrap(); - - write_lp( - &db, - &vec!["h2o,state=CA,city=Boston other_temp=72.4 350"].join("\n"), - ); - - assert_eq!(count_mutable_buffer_chunks(&db), 1); - assert_eq!(count_read_buffer_chunks(&db), 1); - assert_eq!(count_object_store_chunks(&db), 1); - - vec![DbScenario { - scenario_name: "Data in parquet, RUB, and MUB".into(), - db, - }] - } -} - #[derive(Debug)] pub struct OneMeasurementManyFields {} #[async_trait] diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index d4043c129d..d3d6caf041 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -3,17 +3,11 @@ use super::DbScenario; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use backoff::BackoffConfig; -use data_types::{chunk_metadata::ChunkId, delete_predicate::DeletePredicate}; +use data_types::delete_predicate::DeletePredicate; use data_types2::{ IngesterQueryRequest, NonEmptyString, PartitionId, Sequence, SequenceNumber, SequencerId, TombstoneId, }; -use db::test_helpers::chunk_ids_rub; -use db::{ - test_helpers::write_lp, - utils::{count_mub_table_chunks, count_os_table_chunks, count_rub_table_chunks, make_db}, - Db, -}; use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite}; use futures::StreamExt; use generated_types::influxdata::iox::ingester::v1::{ @@ -33,7 +27,6 @@ use querier::{ IngesterConnectionImpl, IngesterFlightClient, IngesterFlightClientError, IngesterFlightClientQueryData, QuerierCatalogCache, QuerierNamespace, }; -use query::QueryChunk; use schema::selection::Selection; use std::cmp::Ordering; use std::collections::{BTreeMap, HashMap}; @@ -45,14 +38,6 @@ use std::{fmt::Display, sync::Arc}; // & when delete predicates are applied // STRUCTs & ENUMs -#[derive(Debug, Clone)] -pub struct ChunkDataOld<'a> { - /// Line protocol data of this chunk - pub lp_lines: Vec<&'a str>, - /// which stage this chunk will be created - pub chunk_stage: ChunkStageOld, -} - #[derive(Debug, Clone, Default)] pub struct ChunkDataNew<'a, 'b> { /// Line protocol data of this chunk @@ -103,39 +88,6 @@ impl<'a, 'b> ChunkDataNew<'a, 'b> { } } -#[derive(Debug, Clone, Copy)] -pub enum ChunkStageOld { - /// Open MUB - Mubo, - /// Frozen MUB - Mubf, - /// RUB without OS - Rub, - /// both RUB and OS of the chunk exist - RubOs, - /// OS only - Os, -} - -impl Display for ChunkStageOld { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Mubo => write!(f, "Open MUB"), - Self::Mubf => write!(f, "Frozen MUB"), - Self::Rub => write!(f, "RUB"), - Self::RubOs => write!(f, "RUB & OS"), - Self::Os => write!(f, "OS"), - } - } -} - -impl ChunkStageOld { - /// return the list of all chunk types - pub fn all() -> Vec { - vec![Self::Mubo, Self::Mubf, Self::Rub, Self::RubOs, Self::Os] - } -} - #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ChunkStageNew { /// In parquet file. @@ -179,14 +131,12 @@ impl ChunkStageNew { #[derive(Debug, Clone, Copy)] pub enum ChunkStage { - Old(ChunkStageOld), New(ChunkStageNew), } impl Display for ChunkStage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Old(stage) => write!(f, "Old: {}", stage), Self::New(stage) => write!(f, "New: {}", stage), } } @@ -195,22 +145,13 @@ impl Display for ChunkStage { impl ChunkStage { /// return the list of all chunk types pub fn all() -> Vec { - ChunkStageOld::all() + ChunkStageNew::all() .into_iter() - .map(ChunkStage::Old) - .chain(ChunkStageNew::all().into_iter().map(ChunkStage::New)) + .map(ChunkStage::New) .collect() } } -#[derive(Debug, Clone)] -pub struct PredOld<'a> { - /// Delete predicate - predicate: &'a DeletePredicate, - /// At which chunk stage this predicate is applied - delete_time: DeleteTimeOld, -} - #[derive(Debug, Clone)] pub struct PredNew<'a> { /// Delete predicate @@ -232,17 +173,12 @@ impl<'a> PredNew<'a> { #[derive(Debug, Clone)] pub enum Pred<'a> { - Old(PredOld<'a>), New(PredNew<'a>), } impl<'a> Pred<'a> { fn new(predicate: &'a DeletePredicate, delete_time: DeleteTime) -> Self { match delete_time { - DeleteTime::Old(delete_time) => Self::Old(PredOld { - predicate, - delete_time, - }), DeleteTime::New(delete_time) => Self::New(PredNew { predicate, delete_time, @@ -251,46 +187,6 @@ impl<'a> Pred<'a> { } } -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum DeleteTimeOld { - /// Delete predicate happens after all chunks created - /// and moved to their corresponding stages - End, - /// Delete predicate is added to chunks at their Mub Open stage - Mubo, - /// Delete predicate is added to chunks at their Mub Frozen stage - Mubf, - /// Delete predicate is added to chunks at their Rub stage - Rub, - /// Delete predicate is added to chunks at their Rub & Os stage - RubOs, - /// Delete predicate is added to chunks at their Os stage - Os, -} - -impl DeleteTimeOld { - /// Return all DeleteTime at and after the given chunk stage - pub fn all_from_and_before(chunk_stage: ChunkStageOld) -> Vec { - match chunk_stage { - ChunkStageOld::Mubo => vec![Self::Mubo], - ChunkStageOld::Mubf => vec![Self::Mubo, Self::Mubf], - ChunkStageOld::Rub => { - vec![Self::Mubo, Self::Mubf, Self::Rub] - } - ChunkStageOld::RubOs => vec![Self::Mubo, Self::Mubf, Self::Rub, Self::RubOs], - ChunkStageOld::Os => vec![Self::Mubo, Self::Mubf, Self::Rub, Self::RubOs, Self::Os], - } - } - - pub fn begin() -> Self { - Self::Mubo - } - - pub fn end() -> Self { - Self::End - } -} - /// Describes when a delete predicate was applied. /// /// # Ordering @@ -392,7 +288,6 @@ impl Display for DeleteTimeNew { #[derive(Debug, Clone, Copy, PartialEq)] pub enum DeleteTime { - Old(DeleteTimeOld), New(DeleteTimeNew), } @@ -400,10 +295,6 @@ impl DeleteTime { /// Return all DeleteTime at and after the given chunk stage fn all_from_and_before(chunk_stage: ChunkStage) -> Vec { match chunk_stage { - ChunkStage::Old(chunk_stage) => DeleteTimeOld::all_from_and_before(chunk_stage) - .into_iter() - .map(Self::Old) - .collect(), ChunkStage::New(chunk_stage) => DeleteTimeNew::all_from_and_before(chunk_stage) .into_iter() .map(Self::New) @@ -413,14 +304,12 @@ impl DeleteTime { fn begin_for(chunk_stage: ChunkStage) -> Self { match chunk_stage { - ChunkStage::Old(_) => Self::Old(DeleteTimeOld::begin()), ChunkStage::New(chunk_stage) => Self::New(DeleteTimeNew::begin_for(chunk_stage)), } } fn end_for(chunk_stage: ChunkStage) -> Self { match chunk_stage { - ChunkStage::Old(_) => Self::Old(DeleteTimeOld::end()), ChunkStage::New(chunk_stage) => Self::New(DeleteTimeNew::end_for(chunk_stage)), } } @@ -498,29 +387,10 @@ async fn make_chunk_with_deletes_at_different_stages( partition_key: &str, ) -> DbScenario { match chunk_stage { - ChunkStage::Old(chunk_stage) => { - let preds: Vec<_> = preds - .into_iter() - .map(|p| match p { - Pred::Old(pred) => pred, - Pred::New(_) => panic!("mixed new and old"), - }) - .collect(); - - make_chunk_with_deletes_at_different_stages_old( - lp_lines, - chunk_stage, - preds, - delete_table_name, - partition_key, - ) - .await - } ChunkStage::New(chunk_stage) => { let preds: Vec<_> = preds .into_iter() .map(|p| match p { - Pred::Old(_) => panic!("mixed new and old"), Pred::New(pred) => pred, }) .collect(); @@ -537,242 +407,6 @@ async fn make_chunk_with_deletes_at_different_stages( } } -async fn make_chunk_with_deletes_at_different_stages_old( - lp_lines: Vec<&str>, - chunk_stage: ChunkStageOld, - preds: Vec>, - delete_table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - - // ---------------------- - // Make an open MUB - // - // There may be more than one tables in the lp data - let tables = write_lp(&db, &lp_lines.join("\n")); - for table in &tables { - let num_mubs = count_mub_table_chunks(&db, table.as_str(), partition_key); - // must be one MUB per table - assert_eq!(num_mubs, 1); - } - // Apply delete predicate - let mut deleted = false; - let mut display = "".to_string(); - let mut count = 0; - for pred in &preds { - if pred.delete_time == DeleteTimeOld::Mubo { - db.delete(delete_table_name, Arc::new(pred.predicate.clone())) - .unwrap(); - deleted = true; - count += 1; - } - } - if count > 0 { - display.push_str(format!(", with {} deletes from open MUB", count).as_str()); - } - - // ---------------------- - // Freeze MUB if requested - match chunk_stage { - ChunkStageOld::Mubf | ChunkStageOld::Rub | ChunkStageOld::RubOs | ChunkStageOld::Os => { - // Since mub are frozen at delete, no need to do it in that case for table of deleted data - if !deleted { - db.rollover_partition(delete_table_name, partition_key) - .await - .unwrap() - .unwrap(); - } - - // Freeze MUBs of tables that not have deleted/deleting data - for table in &tables { - if table != delete_table_name { - db.rollover_partition(table.as_str(), partition_key) - .await - .unwrap() - .unwrap(); - } - } - - // Verify still one MUB and no RUB for each table - for table in &tables { - let num_mubs = count_mub_table_chunks(&db, table.as_str(), partition_key); - let num_rubs = count_rub_table_chunks(&db, table.as_str(), partition_key); - assert_eq!(num_mubs, 1); - assert_eq!(num_rubs, 0); - } - } - _ => {} - } - // Apply delete predicate - count = 0; - for pred in &preds { - if pred.delete_time == DeleteTimeOld::Mubf { - db.delete(delete_table_name, Arc::new(pred.predicate.clone())) - .unwrap(); - count += 1; - } - } - if count > 0 { - display.push_str(format!(", with {} deletes from frozen MUB", count).as_str()); - } - - // ---------------------- - // Move MUB to RUB if requested - match chunk_stage { - ChunkStageOld::Rub | ChunkStageOld::RubOs | ChunkStageOld::Os => { - let mut no_more_data = false; - for table in &tables { - // Compact this MUB of this table - let chunk_result = db.compact_partition(table, partition_key).await.unwrap(); - - // Verify no MUB and one RUB if not all data was soft deleted - let num_mubs = count_mub_table_chunks(&db, table.as_str(), partition_key); - let num_rubs = count_rub_table_chunks(&db, table.as_str(), partition_key); - assert_eq!(num_mubs, 0); - - // Stop if compaction result is nothing which means MUB has - // all soft deleted data and no RUB is created. No more data - // to affect further delete - if table == delete_table_name { - match chunk_result { - Some(_chunk) => { - assert_eq!(num_rubs, 1); - } - None => { - assert_eq!(num_rubs, 0); - no_more_data = true; - } - } - } else { - assert_eq!(num_rubs, 1); - } - } - - if no_more_data { - let scenario_name = - format!("Deleted data from one {} chunk{}", chunk_stage, display); - return DbScenario { scenario_name, db }; - } - } - _ => {} - } - // Apply delete predicate - count = 0; - for pred in &preds { - if pred.delete_time == DeleteTimeOld::Rub { - db.delete(delete_table_name, Arc::new(pred.predicate.clone())) - .unwrap(); - count += 1; - } - } - if count > 0 { - display.push_str(format!(", with {} deletes from RUB", count).as_str()); - } - - // ---------------------- - // Persist RUB to OS if requested - match chunk_stage { - ChunkStageOld::RubOs | ChunkStageOld::Os => { - let mut no_more_data = false; - for table in &tables { - // Persist RUB of this table - let chunk_result = db - .persist_partition(table, partition_key, true) - .await - .unwrap(); - - // Verify no MUB and one RUB if not all data was soft deleted - let num_mubs = count_mub_table_chunks(&db, table.as_str(), partition_key); - let num_rubs = count_rub_table_chunks(&db, table.as_str(), partition_key); - let num_os = count_os_table_chunks(&db, table.as_str(), partition_key); - assert_eq!(num_mubs, 0); - - // For delete table, two different things can happen - if table == delete_table_name { - match chunk_result { - // Not all rows were soft deleted - Some(_chunk) => { - assert_eq!(num_rubs, 1); // still have RUB with the persisted OS - assert_eq!(num_os, 1); - } - // All rows in the RUB were soft deleted. There is nothing after compacting and hence - // nothing will be left and persisted. - None => { - assert_eq!(num_rubs, 0); - assert_eq!(num_os, 0); - no_more_data = true; - } - } - } else { - assert_eq!(num_rubs, 1); - assert_eq!(num_os, 1); - } - } - - if no_more_data { - let scenario_name = - format!("Deleted data from one {} chunk{}", chunk_stage, display); - return DbScenario { scenario_name, db }; - } - } - _ => {} - } - // Apply delete predicate - count = 0; - for pred in &preds { - if pred.delete_time == DeleteTimeOld::RubOs { - db.delete(delete_table_name, Arc::new(pred.predicate.clone())) - .unwrap(); - count = 1; - } - } - if count > 0 { - display.push_str(format!(", with {} deletes from RUB & OS", count).as_str()); - } - - // ---------------------- - // Unload RUB - if let ChunkStageOld::Os = chunk_stage { - for table in &tables { - // retrieve its chunk_id first - let rub_chunk_ids = chunk_ids_rub(&db, Some(table.as_str()), Some(partition_key)); - assert_eq!(rub_chunk_ids.len(), 1); - db.unload_read_buffer(table.as_str(), partition_key, rub_chunk_ids[0]) - .unwrap(); - - // verify chunk stages - let num_mubs = count_mub_table_chunks(&db, table.as_str(), partition_key); - let num_rubs = count_rub_table_chunks(&db, table.as_str(), partition_key); - let num_os = count_os_table_chunks(&db, table.as_str(), partition_key); - assert_eq!(num_mubs, 0); - assert_eq!(num_rubs, 0); - assert_eq!(num_os, 1); - } - } - // Apply delete predicate - count = 0; - for pred in &preds { - if pred.delete_time == DeleteTimeOld::Os || pred.delete_time == DeleteTimeOld::End { - db.delete(delete_table_name, Arc::new(pred.predicate.clone())) - .unwrap(); - count += 1; - } - } - if count > 0 { - display.push_str( - format!( - ", with {} deletes from OS or after all chunks are created", - count - ) - .as_str(), - ); - } - - let scenario_name = format!("Deleted data from one {} chunk{}", chunk_stage, display); - DbScenario { scenario_name, db } -} - async fn make_chunk_with_deletes_at_different_stages_new( lp_lines: Vec<&str>, chunk_stage: ChunkStageNew, @@ -795,205 +429,6 @@ async fn make_chunk_with_deletes_at_different_stages_new( DbScenario { scenario_name, db } } -/// Build many chunks which are in different stages -// Note that, after a lot of thoughts, I decided to have 2 separated functions, this one and the one above. -// The above tests delete predicates before and/or after a chunk is moved to different stages, while -// this function tests different-stage chunks in various stages when one or many deletes happen. -// Even though these 2 functions have some overlapped code, merging them in one -// function will created a much more complicated cases to handle -pub async fn make_different_stage_chunks_with_deletes_scenario_old( - data: Vec>, - preds: Vec<&DeletePredicate>, - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - let mut display = "".to_string(); - - // Build chunks - for chunk_data in &data { - display.push_str(" - "); - display.push_str(&chunk_data.chunk_stage.to_string()); - - // ---------- - // Make an open MUB - write_lp(&db, &chunk_data.lp_lines.join("\n")); - // 0 does not represent the real chunk id. It is here just to initialize the chunk_id variable for later assignment - let mut chunk_id = db.chunk_summaries()[0].id; - - // ---------- - // freeze MUB - match chunk_data.chunk_stage { - ChunkStageOld::Mubf | ChunkStageOld::Rub | ChunkStageOld::RubOs | ChunkStageOld::Os => { - let chunk = db - .rollover_partition(table_name, partition_key) - .await - .unwrap() - .unwrap(); - chunk_id = chunk.id(); - } - _ => {} - } - - // ---------- - // Move MUB to RUB - match chunk_data.chunk_stage { - ChunkStageOld::Rub | ChunkStageOld::RubOs | ChunkStageOld::Os => { - let chunk = db - .compact_chunks(table_name, partition_key, |chunk| chunk.id() == chunk_id) - .await - .unwrap() - .unwrap(); - chunk_id = chunk.id(); - } - _ => {} - } - - // ---------- - // Move RUB to OS - match chunk_data.chunk_stage { - ChunkStageOld::RubOs | ChunkStageOld::Os => { - let chunk = db - .persist_partition(table_name, partition_key, true) - .await - .unwrap() - .unwrap(); - chunk_id = chunk.id(); - } - _ => {} - } - - // ---------- - // Unload RUB - if let ChunkStageOld::Os = chunk_data.chunk_stage { - db.unload_read_buffer(table_name, partition_key, chunk_id) - .unwrap(); - } - } - - // ---------- - // Apply all delete predicates - for pred in &preds { - db.delete(table_name, Arc::new((*pred).clone())).unwrap(); - } - - // Scenario of the input chunks and delete predicates - let scenario_name = format!( - "Deleted data from {} chunks, {}, with {} deletes after all chunks are created", - data.len(), - display, - preds.len() - ); - DbScenario { scenario_name, db } -} - -pub async fn make_os_chunks_and_then_compact_with_different_scenarios_with_delete( - lp_lines_vec: Vec>, - preds: Vec<&DeletePredicate>, - table_name: &str, - partition_key: &str, -) -> Vec { - // Scenario 1: apply deletes and then compact all 3 chunks - let (db, chunk_ids) = - make_contiguous_os_chunks(lp_lines_vec.clone(), table_name, partition_key).await; - for pred in &preds { - db.delete(table_name, Arc::new((*pred).clone())).unwrap(); - } - db.compact_object_store_chunks(table_name, partition_key, chunk_ids) - .unwrap() - .join() - .await; - - let scenario_name = "Deletes and then compact all OS chunks".to_string(); - let scenario_1 = DbScenario { scenario_name, db }; - - // Scenario 2: compact all 3 chunks and apply deletes - let (db, chunk_ids) = - make_contiguous_os_chunks(lp_lines_vec.clone(), table_name, partition_key).await; - db.compact_object_store_chunks(table_name, partition_key, chunk_ids) - .unwrap() - .join() - .await; - for pred in &preds { - db.delete(table_name, Arc::new((*pred).clone())).unwrap(); - } - let scenario_name = "Compact all OS chunks and then deletes".to_string(); - let scenario_2 = DbScenario { scenario_name, db }; - - // Scenario 3: apply deletes then compact the first n-1 chunks - let (db, chunk_ids) = - make_contiguous_os_chunks(lp_lines_vec.clone(), table_name, partition_key).await; - for pred in &preds { - db.delete(table_name, Arc::new((*pred).clone())).unwrap(); - } - let (_last_chunk_id, chunk_ids_but_last) = chunk_ids.split_last().unwrap(); - db.compact_object_store_chunks(table_name, partition_key, chunk_ids_but_last.to_vec()) - .unwrap() - .join() - .await; - let scenario_name = "Deletes and then compact all but last OS chunk".to_string(); - let scenario_3 = DbScenario { scenario_name, db }; - - // Scenario 4: compact the first n-1 chunks then apply deletes - let (db, chunk_ids) = - make_contiguous_os_chunks(lp_lines_vec.clone(), table_name, partition_key).await; - let (_last_chunk_id, chunk_ids_but_last) = chunk_ids.split_last().unwrap(); - db.compact_object_store_chunks(table_name, partition_key, chunk_ids_but_last.to_vec()) - .unwrap() - .join() - .await; - for pred in &preds { - db.delete(table_name, Arc::new((*pred).clone())).unwrap(); - } - let scenario_name = "Compact all but last OS chunk and then deletes".to_string(); - let scenario_4 = DbScenario { scenario_name, db }; - - vec![scenario_1, scenario_2, scenario_3, scenario_4] -} - -async fn make_contiguous_os_chunks( - lp_lines_vec: Vec>, - table_name: &str, - partition_key: &str, -) -> (Arc, Vec) { - // This test is aimed for at least 3 chunks - assert!(lp_lines_vec.len() >= 3); - - // First make all OS chunks fot the lp_lins_vec - // Define they are OS - let mut chunk_data_vec = vec![]; - for lp_lines in lp_lines_vec { - let chunk_data = ChunkDataOld { - lp_lines: lp_lines.clone(), - chunk_stage: ChunkStageOld::Os, - }; - chunk_data_vec.push(chunk_data); - } - // Make db with those OS chunks - let scenario = make_different_stage_chunks_with_deletes_scenario_old( - chunk_data_vec, - vec![], // not delete anything yet - table_name, - partition_key, - ) - .await; - - // Get chunk ids in contiguous order - let db = Arc::downcast::(scenario.db.as_any_arc()).unwrap(); - let partition = db.partition(table_name, partition_key).unwrap(); - let partition = partition.read(); - let mut keyed_chunks: Vec<(_, _)> = partition - .keyed_chunks() - .into_iter() - .map(|(id, order, _chunk)| (id, order)) - .collect(); - keyed_chunks.sort_by(|(_id1, order1), (_id2, order2)| order1.cmp(order2)); - - let chunk_ids: Vec<_> = keyed_chunks.iter().map(|(id, _order)| *id).collect(); - - (db, chunk_ids) -} - /// This function loads two chunks of lp data into 4 different scenarios /// /// Data in single open mutable buffer chunk @@ -1005,147 +440,7 @@ pub async fn make_two_chunk_scenarios( data1: &str, data2: &str, ) -> Vec { - make_two_chunk_scenarios_old(partition_key, data1, data2) - .await - .into_iter() - .chain(make_two_chunk_scenarios_new(partition_key, data1, data2).await) - .collect() -} - -async fn make_two_chunk_scenarios_old( - partition_key: &str, - data1: &str, - data2: &str, -) -> Vec { - let db = make_db().await.db; - write_lp(&db, data1); - write_lp(&db, data2); - let scenario1 = DbScenario { - scenario_name: "Data in single open chunk of mutable buffer".into(), - db, - }; - - // spread across 2 mutable buffer chunks - let db = make_db().await.db; - let table_names = write_lp(&db, data1); - for table_name in &table_names { - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - } - write_lp(&db, data2); - let scenario2 = DbScenario { - scenario_name: "Data in one open chunk and one closed chunk of mutable buffer".into(), - db, - }; - - // spread across 1 mutable buffer, 1 read buffer chunks - let db = make_db().await.db; - let table_names = write_lp(&db, data1); - for table_name in &table_names { - db.compact_partition(table_name, partition_key) - .await - .unwrap(); - } - write_lp(&db, data2); - let scenario3 = DbScenario { - scenario_name: "Data in open chunk of mutable buffer, and one chunk of read buffer".into(), - db, - }; - - // in 2 read buffer chunks - let db = make_db().await.db; - let table_names = write_lp(&db, data1); - for table_name in &table_names { - db.compact_partition(table_name, partition_key) - .await - .unwrap(); - } - let table_names = write_lp(&db, data2); - for table_name in &table_names { - // Compact just the last chunk - db.compact_open_chunk(table_name, partition_key) - .await - .unwrap(); - } - let scenario4 = DbScenario { - scenario_name: "Data in two read buffer chunks".into(), - db, - }; - - // in 2 read buffer chunks that also loaded into object store - let db = make_db().await.db; - let table_names = write_lp(&db, data1); - for table_name in &table_names { - db.persist_partition(table_name, partition_key, true) - .await - .unwrap(); - } - let table_names = write_lp(&db, data2); - for table_name in &table_names { - db.persist_partition(table_name, partition_key, true) - .await - .unwrap(); - } - let scenario5 = DbScenario { - scenario_name: "Data in two read buffer chunks and two parquet file chunks".into(), - db, - }; - - // Scenario 6: Two closed chunk in OS only - let db = make_db().await.db; - let table_names = write_lp(&db, data1); - for table_name in &table_names { - let id = db - .persist_partition(table_name, partition_key, true) - .await - .unwrap() - .unwrap() - .id(); - db.unload_read_buffer(table_name, partition_key, id) - .unwrap(); - } - let table_names = write_lp(&db, data2); - for table_name in &table_names { - let id = db - .persist_partition(table_name, partition_key, true) - .await - .unwrap() - .unwrap() - .id(); - - db.unload_read_buffer(table_name, partition_key, id) - .unwrap(); - } - let scenario6 = DbScenario { - scenario_name: "Data in 2 parquet chunks in object store only".into(), - db, - }; - - // Scenario 7: in a single chunk resulting from compacting MUB and RUB - let db = make_db().await.db; - let table_names = write_lp(&db, data1); - for table_name in &table_names { - // put chunk 1 into RUB - db.compact_partition(table_name, partition_key) - .await - .unwrap(); - } - let table_names = write_lp(&db, data2); // write to MUB - for table_name in &table_names { - // compact chunks into a single RUB chunk - db.compact_partition(table_name, partition_key) - .await - .unwrap(); - } - let scenario7 = DbScenario { - scenario_name: "Data in one compacted read buffer chunk".into(), - db, - }; - - vec![ - scenario1, scenario2, scenario3, scenario4, scenario5, scenario6, scenario7, - ] + make_two_chunk_scenarios_new(partition_key, data1, data2).await } async fn make_two_chunk_scenarios_new( @@ -1221,45 +516,6 @@ pub async fn make_n_chunks_scenario_new(chunks: &[ChunkDataNew<'_, '_>]) -> Vec< scenarios } -// // This function loads one chunk of lp data into RUB for testing predicate pushdown -pub(crate) async fn make_one_rub_or_parquet_chunk_scenario( - partition_key: &str, - data: &str, -) -> Vec { - // Scenario 1: One closed chunk in RUB - let db = make_db().await.db; - let table_names = write_lp(&db, data); - for table_name in &table_names { - db.compact_partition(table_name, partition_key) - .await - .unwrap(); - } - let scenario1 = DbScenario { - scenario_name: "--------------------- Data in read buffer".into(), - db, - }; - - // Scenario 2: One closed chunk in Parquet only - let db = make_db().await.db; - let table_names = write_lp(&db, data); - for table_name in &table_names { - let id = db - .persist_partition(table_name, partition_key, true) - .await - .unwrap() - .unwrap() - .id(); - db.unload_read_buffer(table_name, partition_key, id) - .unwrap(); - } - let scenario2 = DbScenario { - scenario_name: "--------------------- Data in object store only ".into(), - db, - }; - - vec![scenario1, scenario2] -} - /// Create given chunk using the given ingester. /// /// Returns a human-readable chunk description. diff --git a/query_tests/src/sql.rs b/query_tests/src/sql.rs index a17e73e2e3..148f511bbc 100644 --- a/query_tests/src/sql.rs +++ b/query_tests/src/sql.rs @@ -123,28 +123,6 @@ async fn sql_select_from_school() { .await; } -#[tokio::test] -async fn sql_select_from_system_operations() { - test_helpers::maybe_start_logging(); - let expected = vec![ - "+----+---------+------------+---------------+----------------+------------+---------------+-------------------------------------+", - "| id | status | start_time | took_cpu_time | took_wall_time | table_name | partition_key | description |", - "+----+---------+------------+---------------+----------------+------------+---------------+-------------------------------------+", - "| 0 | Success | true | true | true | h2o | 1970-01-01T00 | Compacting chunks to ReadBuffer |", - "| 1 | Success | true | true | true | h2o | 1970-01-01T00 | Persisting chunks to object storage |", - "| 2 | Success | true | true | true | h2o | 1970-01-01T00 | Writing chunk to Object Storage |", - "+----+---------+------------+---------------+----------------+------------+---------------+-------------------------------------+", - ]; - - // Check that the cpu time used reported is greater than zero as it isn't - // repeatable - run_sql_test_case( - OldTwoMeasurementsManyFieldsLifecycle {}, - "SELECT id, status, CAST(start_time as BIGINT) > 0 as start_time, CAST(cpu_time_used AS BIGINT) > 0 as took_cpu_time, CAST(wall_time_used AS BIGINT) > 0 as took_wall_time, table_name, partition_key, description from system.operations", - &expected - ).await; -} - #[tokio::test] async fn sql_union_all() { // validate name resolution works for UNION ALL queries