diff --git a/query/src/provider.rs b/query/src/provider.rs index b66093b6c7..a9408396b2 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -742,11 +742,11 @@ impl Deduplicater { /// ▲ /// │ /// │ - /// ┌───────────────────────┐ - /// │ FilterExec │ - /// | To apply delete preds │ - /// │ (Chunk) │ - /// └───────────────────────┘ + /// ┌─────────────────────────┐ + /// │ FilterExec (optional) │ + /// | To apply delete preds │ + /// │ (Chunk) │ + /// └─────────────────────────┘ /// ▲ /// │ /// │ @@ -861,19 +861,7 @@ impl Deduplicater { } /// Return the simplest IOx scan plan of a given chunk which is IOxReadFilterNode - /// ```text - /// ┌─────────────────┐ - /// │ SortExec │ - /// │ (optional) │ <-- Only added if the input output_sort_key is not empty - /// └─────────────────┘ - /// ▲ - /// │ - /// │ - /// ┌─────────────────┐ - /// │IOxReadFilterNode│ - /// │ (Chunk) │ - /// └─────────────────┘ - ///``` + // And some optional operators on top such as applying delete predicates or sort the chunk fn build_plan_for_non_duplicates_chunk( table_name: Arc, output_schema: Arc, @@ -890,30 +878,28 @@ impl Deduplicater { ) } - /// Return either: - /// the simplest IOx scan plan for many chunks which is IOxReadFilterNode - /// if the input output_sort_key is empty - /// ```text - /// ┌─────────────────┐ - /// │IOxReadFilterNode│ - /// │ (Many Chunks) │ - /// └─────────────────┘ - ///``` - /// - /// Otherwise, many plans like this + /// Return /// /// ```text - /// ┌─────────────────┐ ┌─────────────────┐ - /// │ SortExec │ │ SortExec │ - /// │ (optional) │ │ (optional) │ - /// └─────────────────┘ └─────────────────┘ - /// ▲ ▲ - /// │ ..... │ - /// │ │ - /// ┌─────────────────┐ ┌─────────────────┐ - /// │IOxReadFilterNode│ │IOxReadFilterNode│ - /// │ (Chunk 1) │ │ (Chunk n) │ - /// └─────────────────┘ └─────────────────┘ + /// ┌─────────────────┐ ┌─────────────────┐ + /// │ SortExec │ │ SortExec │ + /// │ (optional) │ │ (optional) │ + /// └─────────────────┘ └─────────────────┘ + /// ▲ ▲ + /// │ │ + /// │ + /// ┌─────────────────────────┐ ┌─────────────────────────┐ + /// │ FilterExec (optional) │ │ FilterExec (optional) │ + /// | To apply delete preds │ ..... | To apply delete preds │ + /// │ (Chunk) │ │ (Chunk) │ + /// └─────────────────────────┘ └─────────────────────────┘ + /// ▲ ▲ + /// │ │ + /// │ │ + /// ┌─────────────────┐ ┌─────────────────┐ + /// │IOxReadFilterNode│ │IOxReadFilterNode│ + /// │ (Chunk 1) │ │ (Chunk n) │ + /// └─────────────────┘ └─────────────────┘ ///``` fn build_plans_for_non_duplicates_chunks( table_name: Arc, @@ -924,8 +910,9 @@ impl Deduplicater { ) -> Result>> { let mut plans: Vec> = vec![]; - // output is not required to be sorted or no chunks provided, only create a read filter for all chunks - if output_sort_key.is_empty() || chunks.is_empty() { + // Since now each chunk may include delete predicates, we need to create plan for each chunk but + // if there is no chunk, we still need to return a plan + if chunks.is_empty() { plans.push(Arc::new(IOxReadFilterNode::new( Arc::clone(&table_name), output_schema, diff --git a/query_tests/src/scenarios.rs b/query_tests/src/scenarios.rs index cc1095396c..6b02233948 100644 --- a/query_tests/src/scenarios.rs +++ b/query_tests/src/scenarios.rs @@ -1505,11 +1505,6 @@ impl DbSetup for DeleteMultiExprsFromOsOneMeasurementOneChunk { } } -// NGA todo next PR: Add these scenarios after deleted data is eliminated from scan -// 1. Many deletes, each has one or/and multi expressions -// 2. Many different-type chunks when a delete happens -// 3. Combination of above - #[derive(Debug)] /// Setup for multi-expression delete query test with one table and one chunk moved from MUB to RUB to OS /// Two deletes at different chunk stages @@ -1549,11 +1544,18 @@ impl DbSetup for TwoDeleteMultiExprsFromMubOneMeasurementOneChunk { .build(); // delete happens when data in MUB - let scenario_mub = make_delete_mub_delete(lp_lines.clone(), pred1.clone(), pred2.clone()).await; + let scenario_mub = + make_delete_mub_delete(lp_lines.clone(), pred1.clone(), pred2.clone()).await; // delete happens when data in MUB then moved to RUB - let scenario_rub = - make_delete_mub_to_rub_delete(lp_lines.clone(), pred1.clone(), pred2.clone(), table_name, partition_key).await; + let scenario_rub = make_delete_mub_to_rub_delete( + lp_lines.clone(), + pred1.clone(), + pred2.clone(), + table_name, + partition_key, + ) + .await; // delete happens when data in MUB then moved to RUB and then persisted let scenario_rub_os = make_delete_mub_to_rub_and_os_delete( @@ -1567,7 +1569,8 @@ impl DbSetup for TwoDeleteMultiExprsFromMubOneMeasurementOneChunk { // delete happens when data in MUB then moved to RUB, then persisted, and then RUB is unloaded let _scenario_os = - make_delete_mub_to_os_delete(lp_lines.clone(), pred1, pred2, table_name, partition_key).await; + make_delete_mub_to_os_delete(lp_lines.clone(), pred1, pred2, table_name, partition_key) + .await; // return scenarios to run queries // NGA todo: BUG in scenario_os @@ -1615,11 +1618,24 @@ impl DbSetup for TwoDeleteMultiExprsFromRubOneMeasurementOneChunk { .build(); // delete happens when data in MUB - let scenario_rub = make_delete_rub_delete(lp_lines.clone(), pred1.clone(), pred2.clone(), table_name, partition_key).await; + let scenario_rub = make_delete_rub_delete( + lp_lines.clone(), + pred1.clone(), + pred2.clone(), + table_name, + partition_key, + ) + .await; // delete happens when data in MUB then moved to RUB - let scenario_rub_delete = - make_delete_rub_to_os_delete(lp_lines.clone(), pred1.clone(), pred2.clone(), table_name, partition_key).await; + let scenario_rub_delete = make_delete_rub_to_os_delete( + lp_lines.clone(), + pred1.clone(), + pred2.clone(), + table_name, + partition_key, + ) + .await; // delete happens when data in MUB then moved to RUB and then persisted let _scenario_rub_os = make_delete_rub_to_os_and_unload_rub_delete( @@ -1638,8 +1654,6 @@ impl DbSetup for TwoDeleteMultiExprsFromRubOneMeasurementOneChunk { } } - - #[derive(Debug)] /// Setup for multi-expression delete query test with one table and one chunk in OS pub struct TwoDeleteMultiExprsFromOsOneMeasurementOneChunk {} @@ -1676,10 +1690,15 @@ impl DbSetup for TwoDeleteMultiExprsFromOsOneMeasurementOneChunk { .add_expr(expr3) .build(); - // delete happens after data is persisted but still in RUB - let scenario_rub_os = - make_delete_os_with_rub_delete(lp_lines.clone(), pred1.clone(), pred2.clone(), table_name, partition_key) - .await; + // delete happens after data is persisted but still in RUB + let scenario_rub_os = make_delete_os_with_rub_delete( + lp_lines.clone(), + pred1.clone(), + pred2.clone(), + table_name, + partition_key, + ) + .await; // delete happens after data is persisted but still in RUB and then unload RUB let _scenario_rub_os_unload_rub = make_delete_os_with_rub_then_unload_rub_delete( @@ -1691,8 +1710,8 @@ impl DbSetup for TwoDeleteMultiExprsFromOsOneMeasurementOneChunk { ) .await; - // delete happens after data is persisted and unload RUB - let _scenario_os = make_delete_os_delete( + // delete happens after data is persisted and unload RUB + let _scenario_os = make_delete_os_delete( lp_lines.clone(), pred1.clone(), pred2.clone(), @@ -1704,13 +1723,13 @@ impl DbSetup for TwoDeleteMultiExprsFromOsOneMeasurementOneChunk { // return scenarios to run queries // NGA todo: bug scenario_rub_os_unload_rub, scenario_os vec![scenario_rub_os] - } } +// NGA todo: Add scenarios that have many different types of chunks at the same time (MUB, RUB, OS) when a or several deletes happen // ----------------------------------------------------------------------------- -// Helper functions +// Helper functions async fn make_delete_mub(lp_lines: Vec<&str>, pred: Predicate) -> DbScenario { let db = make_db().await.db; // create an open MUB @@ -1731,7 +1750,11 @@ async fn make_delete_mub(lp_lines: Vec<&str>, pred: Predicate) -> DbScenario { } } -async fn make_delete_mub_delete(lp_lines: Vec<&str>, pred1: Predicate, pred2: Predicate) -> DbScenario { +async fn make_delete_mub_delete( + lp_lines: Vec<&str>, + pred1: Predicate, + pred2: Predicate, +) -> DbScenario { let db = make_db().await.db; // create an open MUB write_lp(&db, &lp_lines.join("\n")).await; @@ -1753,7 +1776,8 @@ async fn make_delete_mub_delete(lp_lines: Vec<&str>, pred1: Predicate, pred2: Pr assert_eq!(count_object_store_chunks(&db), 0); DbScenario { - scenario_name: "Deleted data from MUB then move and then delete data from frozen MUB".into(), + scenario_name: "Deleted data from MUB then move and then delete data from frozen MUB" + .into(), db, } } @@ -1814,7 +1838,8 @@ async fn make_delete_mub_to_rub_delete( assert_eq!(count_object_store_chunks(&db), 0); DbScenario { - scenario_name: "Deleted data from MUB, then move to RUB, then delete data from RUB again".into(), + scenario_name: "Deleted data from MUB, then move to RUB, then delete data from RUB again" + .into(), db, } } @@ -1891,7 +1916,9 @@ async fn make_delete_mub_to_rub_and_os_delete( assert_eq!(count_object_store_chunks(&db), 1); DbScenario { - scenario_name: "Deleted data from MUB then move to RUB and OS, then delete from RUB and OS again".into(), + scenario_name: + "Deleted data from MUB then move to RUB and OS, then delete from RUB and OS again" + .into(), db, } } @@ -1967,7 +1994,7 @@ async fn make_delete_mub_to_os_delete( db.unload_read_buffer(table_name, partition_key, ChunkId::new(1)) .unwrap(); // delete data from OS after RUb is unloaded - db.delete("cpu", Arc::new(pred2)).await.unwrap(); + db.delete("cpu", Arc::new(pred2)).await.unwrap(); // No MUB, no RUB, one OS assert_eq!(count_mutable_buffer_chunks(&db), 0); assert_eq!(count_read_buffer_chunks(&db), 0); @@ -2112,7 +2139,9 @@ async fn make_delete_rub_to_os_delete( assert_eq!(count_object_store_chunks(&db), 1); DbScenario { - scenario_name: "Deleted data in RUB and then persisted to OS then delete once more from RUB and OS".into(), + scenario_name: + "Deleted data in RUB and then persisted to OS then delete once more from RUB and OS" + .into(), db, } } @@ -2195,7 +2224,9 @@ async fn make_delete_rub_to_os_and_unload_rub_delete( assert_eq!(count_object_store_chunks(&db), 1); DbScenario { - scenario_name: "Deleted data in RUB then persisted to OS then RUB unloaded then delete data from OS".into(), + scenario_name: + "Deleted data in RUB then persisted to OS then RUB unloaded then delete data from OS" + .into(), db, } } @@ -2394,7 +2425,6 @@ async fn make_delete_os( assert_eq!(count_mutable_buffer_chunks(&db), 0); assert_eq!(count_read_buffer_chunks(&db), 0); assert_eq!(count_object_store_chunks(&db), 1); - DbScenario { scenario_name: "Deleted data in OS and the delete happens after RUB is unloaded".into(), @@ -2402,7 +2432,6 @@ async fn make_delete_os( } } - async fn make_delete_os_delete( lp_lines: Vec<&str>, pred1: Predicate, @@ -2438,7 +2467,6 @@ async fn make_delete_os_delete( assert_eq!(count_mutable_buffer_chunks(&db), 0); assert_eq!(count_read_buffer_chunks(&db), 0); assert_eq!(count_object_store_chunks(&db), 1); - DbScenario { scenario_name: "Deleted data in OS and the two delete happens after RUB is unloaded".into(),