diff --git a/query_tests/src/scenarios/delete.rs b/query_tests/src/scenarios/delete.rs index b28123e651..f9eac40813 100644 --- a/query_tests/src/scenarios/delete.rs +++ b/query_tests/src/scenarios/delete.rs @@ -5,16 +5,24 @@ use datafusion::logical_plan::{col, lit}; use predicate::predicate::{Predicate, PredicateBuilder}; use async_trait::async_trait; +use query::QueryChunk; +use std::fmt::Display; use std::sync::Arc; use std::time::{Duration, Instant}; use server::db::test_helpers::write_lp; -use server::utils::{ - count_mutable_buffer_chunks, count_object_store_chunks, count_read_buffer_chunks, make_db, -}; +use server::utils::make_db; use super::{DbScenario, DbSetup}; +// #[derive(Debug)] +// /// Set up different test scenarios that delete all rows from different stages of chunk +// pub struct DeleteAllSoftRows {} +// #[async_trait] +// impl DbSetup for DeleteAllSoftRows { + +// } + #[derive(Debug)] /// Setup for delete query test with one table and one chunk moved from MUB to RUB to OS pub struct DeleteFromMubOneMeasurementOneChunk {} @@ -38,28 +46,13 @@ impl DbSetup for DeleteFromMubOneMeasurementOneChunk { .add_expr(expr) .build(); - // delete happens when data in MUB - let scenario_mub = make_delete_mub(lp_lines.clone(), pred.clone()).await; + let preds = vec![Pred { + predicate: &pred, + delete_time: DeleteTime::Mubo, + }]; - // delete happens when data in MUB then moved to RUB - let scenario_rub = - make_delete_mub_to_rub(lp_lines.clone(), pred.clone(), table_name, partition_key).await; - - // delete happens when data in MUB then moved to RUB and then persisted - let scenario_rub_os = make_delete_mub_to_rub_and_os( - lp_lines.clone(), - pred.clone(), - table_name, - partition_key, - ) - .await; - - // delete happens when data in MUB then moved to RUB, then persisted, and then RUB is unloaded - let scenario_os = - make_delete_mub_to_os(lp_lines.clone(), pred, table_name, partition_key).await; - - // return scenarios to run queries - vec![scenario_mub, scenario_rub, scenario_rub_os, scenario_os] + // delete happens when data in open MUB + delete_from_mub(lp_lines, preds, table_name, partition_key).await } } @@ -86,21 +79,13 @@ impl DbSetup for DeleteFromRubOneMeasurementOneChunk { .add_expr(expr) .build(); - // delete happens to data in RUB - let scenario_rub = - make_delete_rub(lp_lines.clone(), pred.clone(), table_name, partition_key).await; + let preds = vec![Pred { + predicate: &pred, + delete_time: DeleteTime::Rub, + }]; - // delete happens to data in RUB then persisted - let scenario_rub_os = - make_delete_rub_to_os(lp_lines.clone(), pred.clone(), table_name, partition_key).await; - - // delete happens to data in RUB then persisted then RUB unloaded - let scenario_os = - make_delete_rub_to_os_and_unload_rub(lp_lines.clone(), pred, table_name, partition_key) - .await; - - // return scenarios to run queries - vec![scenario_rub, scenario_rub_os, scenario_os] + // delete happens when data in RUB + delete_from_rub(lp_lines, preds, table_name, partition_key).await } } @@ -127,24 +112,22 @@ impl DbSetup for DeleteFromOsOneMeasurementOneChunk { .build(); // delete happens after data is persisted but still in RUB - let scenario_rub_os = - make_delete_os_with_rub(lp_lines.clone(), pred.clone(), table_name, partition_key) - .await; + let preds = vec![Pred { + predicate: &pred, + delete_time: DeleteTime::RubOs, + }]; + let mut scenarios = + delete_from_rub_os(lp_lines.clone(), preds, table_name, partition_key).await; - // delete happens after data is persisted but still in RUB and then unload RUB - let scenario_rub_os_unload_rub = make_delete_os_with_rub_then_unload_rub( - lp_lines.clone(), - pred.clone(), - table_name, - partition_key, - ) - .await; + // delete happens after data is persisted and unloaded from RUB + let preds = vec![Pred { + predicate: &pred, + delete_time: DeleteTime::Os, + }]; + let mut scenarios_os = delete_from_os(lp_lines, preds, table_name, partition_key).await; + scenarios.append(&mut scenarios_os); - // delete happens after data is persisted and RUB is unloaded - let scenario_os = make_delete_os(lp_lines.clone(), pred, table_name, partition_key).await; - - // return scenarios to run queries - vec![scenario_rub_os, scenario_rub_os_unload_rub, scenario_os] + scenarios } } @@ -178,28 +161,13 @@ impl DbSetup for DeleteMultiExprsFromMubOneMeasurementOneChunk { .add_expr(expr2) .build(); - // delete happens when data in MUB - let scenario_mub = make_delete_mub(lp_lines.clone(), pred.clone()).await; + let preds = vec![Pred { + predicate: &pred, + delete_time: DeleteTime::Mubo, + }]; - // delete happens when data in MUB then moved to RUB - let scenario_rub = - make_delete_mub_to_rub(lp_lines.clone(), pred.clone(), table_name, partition_key).await; - - // delete happens when data in MUB then moved to RUB and then persisted - let scenario_rub_os = make_delete_mub_to_rub_and_os( - lp_lines.clone(), - pred.clone(), - table_name, - partition_key, - ) - .await; - - // delete happens when data in MUB then moved to RUB, then persisted, and then RUB is unloaded - let scenario_os = - make_delete_mub_to_os(lp_lines.clone(), pred, table_name, partition_key).await; - - // return scenarios to run queries - vec![scenario_mub, scenario_rub, scenario_rub_os, scenario_os] + // delete happens when data in open MUB + delete_from_mub(lp_lines, preds, table_name, partition_key).await } } @@ -233,21 +201,13 @@ impl DbSetup for DeleteMultiExprsFromRubOneMeasurementOneChunk { .add_expr(expr2) .build(); - // delete happens to data in RUB - let scenario_rub = - make_delete_rub(lp_lines.clone(), pred.clone(), table_name, partition_key).await; + let preds = vec![Pred { + predicate: &pred, + delete_time: DeleteTime::Rub, + }]; - // delete happens to data in RUB then persisted - let scenario_rub_os = - make_delete_rub_to_os(lp_lines.clone(), pred.clone(), table_name, partition_key).await; - - // delete happens to data in RUB then persisted then RUB unloaded - let scenario_os = - make_delete_rub_to_os_and_unload_rub(lp_lines.clone(), pred, table_name, partition_key) - .await; - - // return scenarios to run queries - vec![scenario_rub, scenario_rub_os, scenario_os] + // delete happens when data in RUB + delete_from_rub(lp_lines, preds, table_name, partition_key).await } } @@ -282,24 +242,22 @@ impl DbSetup for DeleteMultiExprsFromOsOneMeasurementOneChunk { .build(); // delete happens after data is persisted but still in RUB - let scenario_rub_os = - make_delete_os_with_rub(lp_lines.clone(), pred.clone(), table_name, partition_key) - .await; + let preds = vec![Pred { + predicate: &pred, + delete_time: DeleteTime::RubOs, + }]; + let mut scenarios = + delete_from_rub_os(lp_lines.clone(), preds, table_name, partition_key).await; - // delete happens after data is persisted but still in RUB and then unload RUB - let scenario_rub_os_unload_rub = make_delete_os_with_rub_then_unload_rub( - lp_lines.clone(), - pred.clone(), - table_name, - partition_key, - ) - .await; + // delete happens after data is persisted and unloaded from RUB + let preds = vec![Pred { + predicate: &pred, + delete_time: DeleteTime::Os, + }]; + let mut scenarios_os = delete_from_os(lp_lines, preds, table_name, partition_key).await; + scenarios.append(&mut scenarios_os); - // delete happens after data is persisted and RUB is unloaded - let scenario_os = make_delete_os(lp_lines.clone(), pred, table_name, partition_key).await; - - // return scenarios to run queries - vec![scenario_rub_os, scenario_rub_os_unload_rub, scenario_os] + scenarios } } @@ -341,37 +299,20 @@ impl DbSetup for TwoDeleteMultiExprsFromMubOneMeasurementOneChunk { .add_expr(expr3) .build(); - // delete happens when data in MUB - let scenario_mub = - make_delete_mub_delete(lp_lines.clone(), pred1.clone(), pred2.clone()).await; + // delete happens when data in MUB and at end + let preds = vec![ + Pred { + predicate: &pred1, + delete_time: DeleteTime::Mubo, + }, + Pred { + predicate: &pred2, + delete_time: DeleteTime::End, + }, + ]; - // delete happens when data in MUB then moved to RUB - let scenario_rub = make_delete_mub_to_rub_delete( - lp_lines.clone(), - pred1.clone(), - pred2.clone(), - table_name, - partition_key, - ) - .await; - - // delete happens when data in MUB then moved to RUB and then persisted - let scenario_rub_os = make_delete_mub_to_rub_and_os_delete( - lp_lines.clone(), - pred1.clone(), - pred2.clone(), - table_name, - partition_key, - ) - .await; - - // delete happens when data in MUB then moved to RUB, then persisted, and then RUB is unloaded - let scenario_os = - make_delete_mub_to_os_delete(lp_lines.clone(), pred1, pred2, table_name, partition_key) - .await; - - // return scenarios to run queries - vec![scenario_mub, scenario_rub, scenario_rub_os, scenario_os] + // delete happens when data in open MUB + delete_from_mub(lp_lines, preds, table_name, partition_key).await } } @@ -413,38 +354,19 @@ impl DbSetup for TwoDeleteMultiExprsFromRubOneMeasurementOneChunk { .add_expr(expr3) .build(); - // delete happens when data in MUB - let scenario_rub = make_delete_rub_delete( - lp_lines.clone(), - pred1.clone(), - pred2.clone(), - table_name, - partition_key, - ) - .await; + let preds = vec![ + Pred { + predicate: &pred1, + delete_time: DeleteTime::Rub, + }, + Pred { + predicate: &pred2, + delete_time: DeleteTime::End, + }, + ]; - // delete happens when data in MUB then moved to RUB - let scenario_rub_delete = make_delete_rub_to_os_delete( - lp_lines.clone(), - pred1.clone(), - pred2.clone(), - table_name, - partition_key, - ) - .await; - - // delete happens when data in MUB then moved to RUB and then persisted - let scenario_rub_os = make_delete_rub_to_os_and_unload_rub_delete( - lp_lines.clone(), - pred1.clone(), - pred2.clone(), - table_name, - partition_key, - ) - .await; - - // return scenarios to run queries - vec![scenario_rub, scenario_rub_delete, scenario_rub_os] + // delete happens when data in RUB and at end + delete_from_rub(lp_lines, preds, table_name, partition_key).await } } @@ -485,40 +407,36 @@ impl DbSetup for TwoDeleteMultiExprsFromOsOneMeasurementOneChunk { .build(); // delete happens after data is persisted but still in RUB - let scenario_rub_os = make_delete_os_with_rub_delete( - lp_lines.clone(), - pred1.clone(), - pred2.clone(), - table_name, - partition_key, - ) - .await; + let preds = vec![ + Pred { + predicate: &pred1, + delete_time: DeleteTime::RubOs, + }, + Pred { + predicate: &pred2, + delete_time: DeleteTime::End, + }, + ]; + let mut scenarios = + delete_from_rub_os(lp_lines.clone(), preds, table_name, partition_key).await; - // delete happens after data is persisted but still in RUB and then unload RUB - let scenario_rub_os_unload_rub = make_delete_os_with_rub_then_unload_rub_delete( - lp_lines.clone(), - pred1.clone(), - pred2.clone(), - table_name, - partition_key, - ) - .await; + // delete happens after data is persisted and unloaded from RUB + let preds = vec![ + Pred { + predicate: &pred1, + delete_time: DeleteTime::Os, + }, + Pred { + predicate: &pred2, + delete_time: DeleteTime::End, + }, + ]; + let mut scenarios_os = delete_from_os(lp_lines, preds, table_name, partition_key).await; + scenarios.append(&mut scenarios_os); - // delete happens after data is persisted and unload RUB - let scenario_os = make_delete_os_delete( - lp_lines.clone(), - pred1.clone(), - pred2.clone(), - table_name, - partition_key, - ) - .await; - - // return scenarios to run queries - vec![scenario_rub_os, scenario_rub_os_unload_rub, scenario_os] + scenarios } } - // Three different delete on three different chunks #[derive(Debug)] /// Setup for three different delete on three different chunks @@ -579,31 +497,146 @@ impl DbSetup for ThreeDeleteThreeChunks { .add_expr(expr) .build(); - let lp_data = vec![lp_lines_1, lp_lines_2, lp_lines_3]; - let preds = vec![pred1, pred2, pred3]; - + // ---------------------- // 3 chunks: MUB, RUB, OS - let scenario_mub_rub_os = - make_mub_rub_os_deletes(&lp_data, &preds, table_name, partition_key).await; + let lp = vec![ + ChunkData { + lp_lines: lp_lines_1.clone(), + chunk_type: ChunkType::Os, + }, + ChunkData { + lp_lines: lp_lines_2.clone(), + chunk_type: ChunkType::Rub, + }, + ChunkData { + lp_lines: lp_lines_3.clone(), + chunk_type: ChunkType::Mubo, + }, + ]; + let preds = vec![&pred1, &pred2, &pred3]; + let scenario_mub_rub_os = make_different_stage_chunks_with_deletes_scenario( + lp, + preds.clone(), + table_name, + partition_key, + ) + .await; - // 3 chunks: 2 MUB, 1 RUB - let scenario_2mub_rub = - make_2mub_rub_deletes(&lp_data, &preds, table_name, partition_key).await; + // ---------------------- + // 3 chunks: 1 MUB open, 1 MUB frozen, 1 RUB + let lp = vec![ + ChunkData { + lp_lines: lp_lines_1.clone(), + chunk_type: ChunkType::Rub, + }, + ChunkData { + lp_lines: lp_lines_2.clone(), + chunk_type: ChunkType::Mubf, + }, + ChunkData { + lp_lines: lp_lines_3.clone(), + chunk_type: ChunkType::Mubo, + }, + ]; + let scenario_2mub_rub = make_different_stage_chunks_with_deletes_scenario( + lp, + preds.clone(), + table_name, + partition_key, + ) + .await; + // ---------------------- // 3 chunks: 2 MUB, 1 OS - let scenario_2mub_os = - make_2mub_os_deletes(&lp_data, &preds, table_name, partition_key).await; + let lp = vec![ + ChunkData { + lp_lines: lp_lines_1.clone(), + chunk_type: ChunkType::Os, + }, + ChunkData { + lp_lines: lp_lines_2.clone(), + chunk_type: ChunkType::Mubf, + }, + ChunkData { + lp_lines: lp_lines_3.clone(), + chunk_type: ChunkType::Mubo, + }, + ]; + let scenario_2mub_os = make_different_stage_chunks_with_deletes_scenario( + lp, + preds.clone(), + table_name, + partition_key, + ) + .await; + // ---------------------- // 3 chunks: 2 RUB, 1 OS - let scenario_2rub_os = - make_2rub_os_deletes(&lp_data, &preds, table_name, partition_key).await; + let lp = vec![ + ChunkData { + lp_lines: lp_lines_1.clone(), + chunk_type: ChunkType::Os, + }, + ChunkData { + lp_lines: lp_lines_2.clone(), + chunk_type: ChunkType::Rub, + }, + ChunkData { + lp_lines: lp_lines_3.clone(), + chunk_type: ChunkType::Rub, + }, + ]; + let scenario_2rub_os = make_different_stage_chunks_with_deletes_scenario( + lp, + preds.clone(), + table_name, + partition_key, + ) + .await; + // ---------------------- // 3 chunks: RUB, 2 OS - let scenario_rub_2os = - make_rub_2os_deletes(&lp_data, &preds, table_name, partition_key).await; + let lp = vec![ + ChunkData { + lp_lines: lp_lines_1.clone(), + chunk_type: ChunkType::Os, + }, + ChunkData { + lp_lines: lp_lines_2.clone(), + chunk_type: ChunkType::Os, + }, + ChunkData { + lp_lines: lp_lines_3.clone(), + chunk_type: ChunkType::Rub, + }, + ]; + let scenario_rub_2os = make_different_stage_chunks_with_deletes_scenario( + lp, + preds.clone(), + table_name, + partition_key, + ) + .await; + // ---------------------- // 3 chunks: 3 OS - let scenario_3os = make_3os_deletes(&lp_data, &preds, table_name, partition_key).await; + let lp = vec![ + ChunkData { + lp_lines: lp_lines_1, + chunk_type: ChunkType::Os, + }, + ChunkData { + lp_lines: lp_lines_2, + chunk_type: ChunkType::Os, + }, + ChunkData { + lp_lines: lp_lines_3, + chunk_type: ChunkType::Os, + }, + ]; + let scenario_3os = + make_different_stage_chunks_with_deletes_scenario(lp, preds, table_name, partition_key) + .await; // return scenarios to run queries vec![ @@ -618,1096 +651,438 @@ impl DbSetup for ThreeDeleteThreeChunks { } // ----------------------------------------------------------------------------- -// Helper functions -async fn make_delete_mub(lp_lines: Vec<&str>, pred: Predicate) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // One open MUB, no RUB, no OS - assert_eq!(count_mutable_buffer_chunks(&db), 1); - assert_eq!(count_read_buffer_chunks(&db), 0); - assert_eq!(count_object_store_chunks(&db), 0); - db.delete("cpu", Arc::new(pred)).await.unwrap(); - // Still one but frozen MUB, no RUB, no OS - assert_eq!(count_mutable_buffer_chunks(&db), 1); - assert_eq!(count_read_buffer_chunks(&db), 0); - assert_eq!(count_object_store_chunks(&db), 0); +// Helper structs and functions - DbScenario { - scenario_name: "Deleted data in MUB".into(), - db, +#[derive(Debug, Clone)] +pub struct ChunkData<'a> { + lp_lines: Vec<&'a str>, + chunk_type: ChunkType, +} + +#[derive(Debug, Clone)] +pub enum ChunkType { + /// Open MUB + Mubo, + /// Frozen MUB + Mubf, + Rub, + RubOs, + Os, +} + +impl Display for ChunkType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ChunkType::Mubo => write!(f, "Open MUB"), + ChunkType::Mubf => write!(f, "Frozen MUB"), + ChunkType::Rub => write!(f, "RUB"), + ChunkType::RubOs => write!(f, "RUB & OS"), + ChunkType::Os => write!(f, "OS"), + } } } -async fn make_delete_mub_delete( +#[derive(Debug, Clone)] +pub struct Pred<'a> { + predicate: &'a Predicate, + delete_time: DeleteTime, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum DeleteTime { + /// delete predicate happens after all chunks created + /// and moved to their corresponding stages + End, + + /// delete predicate applies to chunks at their Mub Open stage + Mubo, + + /// delete predicate applies to chunks at their Mub Frozen stage + Mubf, + + /// delete predicate applies to chunks at their Rub stage + Rub, + + /// delete predicate applies to chunks at their Rub & Os stage + RubOs, + + /// delete predicate applies to chunks at their Os stage + Os, +} + +/// Build a chunk that may move with life cycle before/after deletes +/// Note the chunk in this function can be moved to different stages and delete predicates +/// can be applied at different stages when the chunk is moved. +async fn make_chunk_with_deletes_at_different_stages( lp_lines: Vec<&str>, - pred1: Predicate, - pred2: Predicate, + chunk_type: ChunkType, + preds: Vec>, + table_name: &str, + partition_key: &str, ) -> DbScenario { let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // One open MUB, no RUB, no OS - assert_eq!(count_mutable_buffer_chunks(&db), 1); - assert_eq!(count_read_buffer_chunks(&db), 0); - assert_eq!(count_object_store_chunks(&db), 0); - // delete from MUB - db.delete("cpu", Arc::new(pred1)).await.unwrap(); - // Still one but frozen MUB, no RUB, no OS - assert_eq!(count_mutable_buffer_chunks(&db), 1); - assert_eq!(count_read_buffer_chunks(&db), 0); - assert_eq!(count_object_store_chunks(&db), 0); - // delete from frozen MUB - db.delete("cpu", Arc::new(pred2)).await.unwrap(); - // Still one frozen MUB, no RUB, no OS - assert_eq!(count_mutable_buffer_chunks(&db), 1); - assert_eq!(count_read_buffer_chunks(&db), 0); - assert_eq!(count_object_store_chunks(&db), 0); - DbScenario { - scenario_name: "Deleted data from MUB then move and then delete data from frozen MUB" - .into(), - db, + // Make an open MUB + write_lp(&db, &lp_lines.join("\n")).await; + // 0 does not represent the real chunk id. It is here just to initialize the chunk_id variable for later assignment + let mut chunk_id = ChunkId::new(0); + // Apply delete predicate + let mut deleted = false; + let mut display = "".to_string(); + let mut count = 0; + for pred in &preds { + if pred.delete_time == DeleteTime::Mubo { + db.delete(table_name, Arc::new(pred.predicate.clone())) + .await + .unwrap(); + deleted = true; + count += 1; + } } + if count > 0 { + display.push_str(format!(", with {} deletes from open MUB", count).as_str()); + } + + // Freeze MUB is requested + // Since mub are frozen at delete, no need to do it in that case + if !deleted { + match chunk_type { + ChunkType::Mubf | ChunkType::Rub | ChunkType::RubOs | ChunkType::Os => { + let chunk = db + .rollover_partition(table_name, partition_key) + .await + .unwrap() + .unwrap(); + chunk_id = chunk.id(); + } + _ => {} + } + } + // Apply delete predicate + count = 0; + for pred in &preds { + if pred.delete_time == DeleteTime::Mubf { + db.delete(table_name, Arc::new(pred.predicate.clone())) + .await + .unwrap(); + count += 1; + } + } + if count > 0 { + display.push_str(format!(", with {} deletes from frozen MUB", count).as_str()); + } + + // Move MUB to RUB + match chunk_type { + ChunkType::Rub | ChunkType::RubOs | ChunkType::Os => { + let chunk = db + .move_chunk_to_read_buffer(table_name, partition_key, chunk_id) + .await + .unwrap(); + chunk_id = chunk.id(); + } + _ => {} + } + // Apply delete predicate + count = 0; + for pred in &preds { + if pred.delete_time == DeleteTime::Rub { + db.delete(table_name, Arc::new(pred.predicate.clone())) + .await + .unwrap(); + count += 1; + } + } + if count > 0 { + display.push_str(format!(", with {} deletes from RUB", count).as_str()); + } + + // Move RUB to OS + match chunk_type { + ChunkType::RubOs | ChunkType::Os => { + let chunk = db + .persist_partition( + table_name, + partition_key, + Instant::now() + Duration::from_secs(1), + ) + .await + .unwrap(); + chunk_id = chunk.id(); + } + _ => {} + } + // Apply delete predicate + count = 0; + for pred in &preds { + if pred.delete_time == DeleteTime::RubOs { + db.delete(table_name, Arc::new(pred.predicate.clone())) + .await + .unwrap(); + count = 1; + } + } + if count > 0 { + display.push_str(format!(", with {} deletes from RUB & OS", count).as_str()); + } + + // Unload RUB + if let ChunkType::Os = chunk_type { + db.unload_read_buffer(table_name, partition_key, chunk_id) + .unwrap(); + } + // Apply delete predicate + count = 0; + for pred in &preds { + if pred.delete_time == DeleteTime::Os || pred.delete_time == DeleteTime::End { + db.delete(table_name, Arc::new(pred.predicate.clone())) + .await + .unwrap(); + count += 1; + } + } + if count > 0 { + display.push_str( + format!( + ", with {} deletes from OS or after all chunks created", + count + ) + .as_str(), + ); + } + + let scenario_name = format!("Deleted data from one chunk{}", display); + DbScenario { scenario_name, db } } -async fn make_delete_mub_to_rub( +/// Build many chunks which are in different stages +// Note that, after a lot of thoughts, I decided to have 2 separated functionsn this one and the one above. +// The above tests delete predicates before and/or after a chunk is moved to different stages, while +// this function tests different-stage chunks in different stages when one or many deletes happen. +// Even though these 2 functions have some overlapped code, merging them in one +// function will created a much more complicated cases to handle +async fn make_different_stage_chunks_with_deletes_scenario( + data: Vec>, + preds: Vec<&Predicate>, + table_name: &str, + partition_key: &str, +) -> DbScenario { + let db = make_db().await.db; + let mut display = "".to_string(); + + //build_different_stage_chunks(&db, data, ChunkType::Mubo, table_name, partition_key).await; + for chunk_data in &data { + display.push_str(" - "); + display.push_str(&chunk_data.chunk_type.to_string()); + // Make an open MUB + write_lp(&db, &chunk_data.lp_lines.join("\n")).await; + // 0 does not represent the real chunk id. It is here just to initialize the chunk_id variable for later assignment + let mut chunk_id = ChunkId::new(0); + + // freeze MUB + match chunk_data.chunk_type { + ChunkType::Mubf | ChunkType::Rub | ChunkType::RubOs | ChunkType::Os => { + let chunk = db + .rollover_partition(table_name, partition_key) + .await + .unwrap() + .unwrap(); + chunk_id = chunk.id(); + } + _ => {} + } + + // Move MUB to RUB + match chunk_data.chunk_type { + ChunkType::Rub | ChunkType::RubOs | ChunkType::Os => { + let chunk = db + .move_chunk_to_read_buffer(table_name, partition_key, chunk_id) + .await + .unwrap(); + chunk_id = chunk.id(); + } + _ => {} + } + + // Move RUB to OS + match chunk_data.chunk_type { + ChunkType::RubOs | ChunkType::Os => { + let chunk = db + .persist_partition( + table_name, + partition_key, + Instant::now() + Duration::from_secs(1), + ) + .await + .unwrap(); + chunk_id = chunk.id(); + } + _ => {} + } + + // Unload RUB + if let ChunkType::Os = chunk_data.chunk_type { + db.unload_read_buffer(table_name, partition_key, chunk_id) + .unwrap(); + } + } + + for pred in &preds { + db.delete(table_name, Arc::new((*pred).clone())) + .await + .unwrap(); + } + + let scenario_name = format!( + "Deleted data from {} chunks, {}, with {} deletes after all chunks are created", + data.len(), + display, + preds.len() + ); + DbScenario { scenario_name, db } +} + +async fn delete_from_mub( lp_lines: Vec<&str>, - pred: Predicate, + preds: Vec>, table_name: &str, partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // delete data in MUB - db.delete("cpu", Arc::new(pred)).await.unwrap(); - // move MUB to RUB and the delete predicate will be automatically included in RUB - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - // No MUB, one RUB, no OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 1); - assert_eq!(count_object_store_chunks(&db), 0); +) -> Vec { + // delete happens when data in open MUB + let scenario_mub = make_chunk_with_deletes_at_different_stages( + lp_lines.clone(), + ChunkType::Mubo, + preds.clone(), + table_name, + partition_key, + ) + .await; - DbScenario { - scenario_name: "Deleted data in RUB moved from MUB".into(), - db, - } + // delete happens when data in open MUB then moved to RUB + let scenario_mub_to_rub = make_chunk_with_deletes_at_different_stages( + lp_lines.clone(), + ChunkType::Rub, + preds.clone(), + table_name, + partition_key, + ) + .await; + + // delete happens when data in open MUB then moved to RUB and then persisted + let scenario_mub_to_rub_os = make_chunk_with_deletes_at_different_stages( + lp_lines.clone(), + ChunkType::RubOs, + preds.clone(), + table_name, + partition_key, + ) + .await; + + // delete happens when data in MUB then moved to RUB, then persisted, and then RUB is unloaded + let scenario_mub_to_os = make_chunk_with_deletes_at_different_stages( + lp_lines.clone(), + ChunkType::Os, + preds.clone(), + table_name, + partition_key, + ) + .await; + + // return scenarios to run queries + vec![ + scenario_mub, + scenario_mub_to_rub, + scenario_mub_to_rub_os, + scenario_mub_to_os, + ] } -async fn make_delete_mub_to_rub_delete( +async fn delete_from_rub( lp_lines: Vec<&str>, - pred1: Predicate, - pred2: Predicate, + preds: Vec>, table_name: &str, partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // delete data from MUB - db.delete("cpu", Arc::new(pred1)).await.unwrap(); - // move MUB to RUB and the delete predicate will be automatically included in RUB - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - // delete data from RUB - db.delete("cpu", Arc::new(pred2)).await.unwrap(); - // No MUB, one RUB, no OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 1); - assert_eq!(count_object_store_chunks(&db), 0); +) -> Vec { + // delete happens when data in RUB + let scenario_rub = make_chunk_with_deletes_at_different_stages( + lp_lines.clone(), + ChunkType::Rub, + preds.clone(), + table_name, + partition_key, + ) + .await; - DbScenario { - scenario_name: "Deleted data from MUB, then move to RUB, then delete data from RUB again" - .into(), - db, - } + // delete happens to data in RUB then persisted + let scenario_rub_to_rub_os = make_chunk_with_deletes_at_different_stages( + lp_lines.clone(), + ChunkType::RubOs, + preds.clone(), + table_name, + partition_key, + ) + .await; + + // delete happens to data in RUB then persisted then RUB unloaded + let scenario_rub_to_os = make_chunk_with_deletes_at_different_stages( + lp_lines.clone(), + ChunkType::Os, + preds.clone(), + table_name, + partition_key, + ) + .await; + + // return scenarios to run queries + vec![scenario_rub, scenario_rub_to_rub_os, scenario_rub_to_os] } -async fn make_delete_mub_to_rub_and_os( +async fn delete_from_rub_os( lp_lines: Vec<&str>, - pred: Predicate, + preds: Vec>, table_name: &str, partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // delete data in MUB - db.delete("cpu", Arc::new(pred)).await.unwrap(); - // move MUB to RUB and the delete predicate will be automatically included in RUB - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - // persist RUB and the delete predicate will be automatically included in the OS chunk - db.persist_partition( +) -> Vec { + // delete happens after data is persisted but still in RUB + let scenario_rub_os = make_chunk_with_deletes_at_different_stages( + lp_lines.clone(), + ChunkType::RubOs, + preds.clone(), table_name, partition_key, - Instant::now() + Duration::from_secs(1), ) - .await - .unwrap(); - // No MUB, one RUB, one OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 1); - assert_eq!(count_object_store_chunks(&db), 1); + .await; - DbScenario { - scenario_name: "Deleted data in RUB and OS".into(), - db, - } + // delete happens after data is persisted but still in RUB and then unload RUB + let scenario_rub_os_to_os = make_chunk_with_deletes_at_different_stages( + lp_lines.clone(), + ChunkType::Os, + preds.clone(), + table_name, + partition_key, + ) + .await; + + // return scenarios to run queries + vec![scenario_rub_os, scenario_rub_os_to_os] } -async fn make_delete_mub_to_rub_and_os_delete( +async fn delete_from_os( lp_lines: Vec<&str>, - pred1: Predicate, - pred2: Predicate, + preds: Vec>, table_name: &str, partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // delete data in MUB - db.delete("cpu", Arc::new(pred1)).await.unwrap(); - // move MUB to RUB and the delete predicate will be automatically included in RUB - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - // persist RUB and the delete predicate will be automatically included in the OS chunk - db.persist_partition( +) -> Vec { + // delete happens after data is persisted and RUB is unloaded + let scenario_os = make_chunk_with_deletes_at_different_stages( + lp_lines.clone(), + ChunkType::Os, + preds.clone(), table_name, partition_key, - Instant::now() + Duration::from_secs(1), ) - .await - .unwrap(); - // delete from RUB and OS - db.delete("cpu", Arc::new(pred2)).await.unwrap(); - // No MUB, one RUB, one OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 1); - assert_eq!(count_object_store_chunks(&db), 1); + .await; - DbScenario { - scenario_name: - "Deleted data from MUB then move to RUB and OS, then delete from RUB and OS again" - .into(), - db, - } -} - -async fn make_delete_mub_to_os( - lp_lines: Vec<&str>, - pred: Predicate, - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // delete data in MUB - db.delete("cpu", Arc::new(pred)).await.unwrap(); - // move MUB to RUB and the delete predicate will be automatically included in RUB - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - // persist RUB and the delete predicate will be automatically included in the OS chunk - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - // remove RUB - db.unload_read_buffer(table_name, partition_key, ChunkId::new(1)) - .unwrap(); - // No MUB, no RUB, one OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 0); - assert_eq!(count_object_store_chunks(&db), 1); - - DbScenario { - scenario_name: "Deleted data in OS".into(), - db, - } -} - -async fn make_delete_mub_to_os_delete( - lp_lines: Vec<&str>, - pred1: Predicate, - pred2: Predicate, - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // delete data from MUB - db.delete("cpu", Arc::new(pred1)).await.unwrap(); - // move MUB to RUB and the delete predicate will be automatically included in RUB - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - // persist RUB and the delete predicate will be automatically included in the OS chunk - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - // remove RUB - db.unload_read_buffer(table_name, partition_key, ChunkId::new(1)) - .unwrap(); - // delete data from OS after RUb is unloaded - db.delete("cpu", Arc::new(pred2)).await.unwrap(); - // No MUB, no RUB, one OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 0); - assert_eq!(count_object_store_chunks(&db), 1); - - DbScenario { - scenario_name: "Deleted data from MUB, then move to RUB to OS, then unload RUB, and delete from OS again".into(), - db, - } -} - -async fn make_delete_rub( - lp_lines: Vec<&str>, - pred: Predicate, - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // move MUB to RUB - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - // delete data in RUB - db.delete("cpu", Arc::new(pred)).await.unwrap(); - // No MUB, one RUB, no OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 1); - assert_eq!(count_object_store_chunks(&db), 0); - - DbScenario { - scenario_name: "Deleted data in RUB".into(), - db, - } -} - -async fn make_delete_rub_delete( - lp_lines: Vec<&str>, - pred1: Predicate, - pred2: Predicate, - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // move MUB to RUB - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - // delete data from RUB - db.delete("cpu", Arc::new(pred1)).await.unwrap(); - // delete data from RUB again - db.delete("cpu", Arc::new(pred2)).await.unwrap(); - // No MUB, one RUB, no OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 1); - assert_eq!(count_object_store_chunks(&db), 0); - - DbScenario { - scenario_name: "Deleted data from RUB twice ".into(), - db, - } -} - -async fn make_delete_rub_to_os( - lp_lines: Vec<&str>, - pred: Predicate, - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // move MUB to RUB - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - // delete data in RUB - db.delete("cpu", Arc::new(pred)).await.unwrap(); - // persist RUB and the delete predicate will be automatically included in the OS chunk - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - // No MUB, one RUB, one OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 1); - assert_eq!(count_object_store_chunks(&db), 1); - - DbScenario { - scenario_name: "Deleted data in RUB and then persisted to OS".into(), - db, - } -} - -async fn make_delete_rub_to_os_delete( - lp_lines: Vec<&str>, - pred1: Predicate, - pred2: Predicate, - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // move MUB to RUB - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - // delete data from RUB - db.delete("cpu", Arc::new(pred1)).await.unwrap(); - // persist RUB and the delete predicate will be automatically included in the OS chunk - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - // delete data from RUB & OS - db.delete("cpu", Arc::new(pred2)).await.unwrap(); - // No MUB, one RUB, one OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 1); - assert_eq!(count_object_store_chunks(&db), 1); - - DbScenario { - scenario_name: - "Deleted data in RUB and then persisted to OS then delete once more from RUB and OS" - .into(), - db, - } -} - -async fn make_delete_rub_to_os_and_unload_rub( - lp_lines: Vec<&str>, - pred: Predicate, - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // move MUB to RUB - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - // delete data in RUB - db.delete("cpu", Arc::new(pred)).await.unwrap(); - // persist RUB and the delete predicate will be automatically included in the OS chunk - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - // remove RUB - db.unload_read_buffer(table_name, partition_key, ChunkId::new(1)) - .unwrap(); - // No MUB, no RUB, one OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 0); - assert_eq!(count_object_store_chunks(&db), 1); - - DbScenario { - scenario_name: "Deleted data in RUB then persisted to OS then RUB unloaded".into(), - db, - } -} - -async fn make_delete_rub_to_os_and_unload_rub_delete( - lp_lines: Vec<&str>, - pred1: Predicate, - pred2: Predicate, - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // move MUB to RUB - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - // delete data from RUB - db.delete("cpu", Arc::new(pred1)).await.unwrap(); - // persist RUB and the delete predicate will be automatically included in the OS chunk - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - // remove RUB - db.unload_read_buffer(table_name, partition_key, ChunkId::new(1)) - .unwrap(); - // delete data from OS - db.delete("cpu", Arc::new(pred2)).await.unwrap(); - // No MUB, no RUB, one OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 0); - assert_eq!(count_object_store_chunks(&db), 1); - - DbScenario { - scenario_name: - "Deleted data in RUB then persisted to OS then RUB unloaded then delete data from OS" - .into(), - db, - } -} - -async fn make_delete_os_with_rub( - lp_lines: Vec<&str>, - pred: Predicate, - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // move MUB to RUB - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - // persist RUB and the delete predicate will be automatically included in the OS chunk - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - // delete data after persisted but RUB still available - db.delete("cpu", Arc::new(pred)).await.unwrap(); - // No MUB, one RUB, one OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 1); - assert_eq!(count_object_store_chunks(&db), 1); - - DbScenario { - scenario_name: "Deleted data in OS with RUB".into(), - db, - } -} - -async fn make_delete_os_with_rub_delete( - lp_lines: Vec<&str>, - pred1: Predicate, - pred2: Predicate, - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // move MUB to RUB - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - // persist RUB and the delete predicate will be automatically included in the OS chunk - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - // delete data after persisted but RUB still available - db.delete("cpu", Arc::new(pred1)).await.unwrap(); - db.delete("cpu", Arc::new(pred2)).await.unwrap(); - // No MUB, one RUB, one OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 1); - assert_eq!(count_object_store_chunks(&db), 1); - - DbScenario { - scenario_name: "Delete twice from OS with RUB".into(), - db, - } -} - -async fn make_delete_os_with_rub_then_unload_rub( - lp_lines: Vec<&str>, - pred: Predicate, - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // move MUB to RUB - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - // persist RUB and the delete predicate will be automatically included in the OS chunk - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - // delete data after persisted but RUB still available - db.delete("cpu", Arc::new(pred)).await.unwrap(); - // remove RUB - db.unload_read_buffer(table_name, partition_key, ChunkId::new(1)) - .unwrap(); - // No MUB, no RUB, one OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 0); - assert_eq!(count_object_store_chunks(&db), 1); - - DbScenario { - scenario_name: "Deleted data in OS only but the delete happens before RUB is unloaded" - .into(), - db, - } -} - -async fn make_delete_os_with_rub_then_unload_rub_delete( - lp_lines: Vec<&str>, - pred1: Predicate, - pred2: Predicate, - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // move MUB to RUB - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - // persist RUB and the delete predicate will be automatically included in the OS chunk - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - // delete data after persisted but RUB still available - db.delete("cpu", Arc::new(pred1)).await.unwrap(); - // remove RUB - db.unload_read_buffer(table_name, partition_key, ChunkId::new(1)) - .unwrap(); - // delete again - db.delete("cpu", Arc::new(pred2)).await.unwrap(); - // No MUB, no RUB, one OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 0); - assert_eq!(count_object_store_chunks(&db), 1); - - DbScenario { - scenario_name: "Deleted data in OS only but the delete happens before RUB is unloaded, then delete one more" - .into(), - db, - } -} - -async fn make_delete_os( - lp_lines: Vec<&str>, - pred: Predicate, - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // move MUB to RUB - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - // persist RUB and the delete predicate will be automatically included in the OS chunk - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - // remove RUB - db.unload_read_buffer(table_name, partition_key, ChunkId::new(1)) - .unwrap(); - // delete data after persisted but RUB still available - db.delete("cpu", Arc::new(pred)).await.unwrap(); - // No MUB, no RUB, one OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 0); - assert_eq!(count_object_store_chunks(&db), 1); - - DbScenario { - scenario_name: "Deleted data in OS and the delete happens after RUB is unloaded".into(), - db, - } -} - -async fn make_delete_os_delete( - lp_lines: Vec<&str>, - pred1: Predicate, - pred2: Predicate, - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - // create an open MUB - write_lp(&db, &lp_lines.join("\n")).await; - // move MUB to RUB - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - // persist RUB and the delete predicate will be automatically included in the OS chunk - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - // remove RUB - db.unload_read_buffer(table_name, partition_key, ChunkId::new(1)) - .unwrap(); - // delete data after persisted but RUB still available - db.delete("cpu", Arc::new(pred1)).await.unwrap(); - db.delete("cpu", Arc::new(pred2)).await.unwrap(); - // No MUB, no RUB, one OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 0); - assert_eq!(count_object_store_chunks(&db), 1); - - DbScenario { - scenario_name: "Deleted data in OS and the two delete happens after RUB is unloaded".into(), - db, - } -} - -async fn make_mub_rub_os_deletes( - lp_data: &[Vec<&str>], - preds: &[Predicate], - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - - // chunk 1 is an OS chunk - write_lp(&db, &lp_data[0].join("\n")).await; - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - db.unload_read_buffer(table_name, partition_key, ChunkId::new(1)) - .unwrap(); - - // Chunk 2 is a RUB - write_lp(&db, &lp_data[1].join("\n")).await; - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(2)) - .await - .unwrap(); - - // Chunk 3 is a MUB - write_lp(&db, &lp_data[2].join("\n")).await; - - // 1 MUB, 1 RUB, 1 OS - assert_eq!(count_mutable_buffer_chunks(&db), 1); - assert_eq!(count_read_buffer_chunks(&db), 1); - assert_eq!(count_object_store_chunks(&db), 1); - - // Let issue 3 deletes - db.delete("cpu", Arc::new(preds[0].clone())).await.unwrap(); - db.delete("cpu", Arc::new(preds[1].clone())).await.unwrap(); - db.delete("cpu", Arc::new(preds[2].clone())).await.unwrap(); - - DbScenario { - scenario_name: "Deleted data from MUB, RUB, and OS".into(), - db, - } -} - -async fn make_2mub_rub_deletes( - lp_data: &[Vec<&str>], - preds: &[Predicate], - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - - // Chunk 1 is a RUB - write_lp(&db, &lp_data[0].join("\n")).await; - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - - // Chunk 2 is an frozen MUB and chunk 3 is an open MUB - write_lp(&db, &lp_data[1].join("\n")).await; - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - write_lp(&db, &lp_data[2].join("\n")).await; - - // 3 MUB, 1 RUB, 0 OS - assert_eq!(count_mutable_buffer_chunks(&db), 2); - assert_eq!(count_read_buffer_chunks(&db), 1); - assert_eq!(count_object_store_chunks(&db), 0); - - // Let issue 3 deletes - db.delete("cpu", Arc::new(preds[0].clone())).await.unwrap(); - db.delete("cpu", Arc::new(preds[1].clone())).await.unwrap(); - db.delete("cpu", Arc::new(preds[2].clone())).await.unwrap(); - - DbScenario { - scenario_name: "Deleted data from 2 MUB, 1 RUB".into(), - db, - } -} - -async fn make_2mub_os_deletes( - lp_data: &[Vec<&str>], - preds: &[Predicate], - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - - // chunk 1 is an OS chunk - write_lp(&db, &lp_data[0].join("\n")).await; - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - db.unload_read_buffer(table_name, partition_key, ChunkId::new(1)) - .unwrap(); - - // Chunk 2 is an frozen MUB and chunk 3 is an open MUB - write_lp(&db, &lp_data[1].join("\n")).await; - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - write_lp(&db, &lp_data[2].join("\n")).await; - - // 2 MUB, 1 OS - assert_eq!(count_mutable_buffer_chunks(&db), 2); - assert_eq!(count_read_buffer_chunks(&db), 0); - assert_eq!(count_object_store_chunks(&db), 1); - - // Let issue 3 deletes - db.delete("cpu", Arc::new(preds[0].clone())).await.unwrap(); - db.delete("cpu", Arc::new(preds[1].clone())).await.unwrap(); - db.delete("cpu", Arc::new(preds[2].clone())).await.unwrap(); - - DbScenario { - scenario_name: "Deleted data from 2 MUB, 1 OS".into(), - db, - } -} - -async fn make_2rub_os_deletes( - lp_data: &[Vec<&str>], - preds: &[Predicate], - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - - // chunk 1 is an OS chunk - write_lp(&db, &lp_data[0].join("\n")).await; - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - db.unload_read_buffer(table_name, partition_key, ChunkId::new(1)) - .unwrap(); - - // Chunk 2 and 3 are RUBss - write_lp(&db, &lp_data[1].join("\n")).await; - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(2)) - .await - .unwrap(); - - write_lp(&db, &lp_data[2].join("\n")).await; - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(3)) - .await - .unwrap(); - - // 2 RUB, 1 OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 2); - assert_eq!(count_object_store_chunks(&db), 1); - - // Let issue 3 deletes - db.delete("cpu", Arc::new(preds[0].clone())).await.unwrap(); - db.delete("cpu", Arc::new(preds[1].clone())).await.unwrap(); - db.delete("cpu", Arc::new(preds[2].clone())).await.unwrap(); - - DbScenario { - scenario_name: "Deleted data from 2 RUB, 1 OS".into(), - db, - } -} - -async fn make_rub_2os_deletes( - lp_data: &[Vec<&str>], - preds: &[Predicate], - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - - // chunk 1 and 2 are OS - write_lp(&db, &lp_data[0].join("\n")).await; - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - db.unload_read_buffer(table_name, partition_key, ChunkId::new(1)) - .unwrap(); - - // Chunk 2 - write_lp(&db, &lp_data[1].join("\n")).await; - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(2)) - .await - .unwrap(); - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - db.unload_read_buffer(table_name, partition_key, ChunkId::new(3)) - .unwrap(); - - // Chunk 3 are RUB - write_lp(&db, &lp_data[2].join("\n")).await; - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(4)) - .await - .unwrap(); - - // 1 RUB, 2 OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 1); - assert_eq!(count_object_store_chunks(&db), 2); - - // Let issue 3 deletes - db.delete("cpu", Arc::new(preds[0].clone())).await.unwrap(); - db.delete("cpu", Arc::new(preds[1].clone())).await.unwrap(); - db.delete("cpu", Arc::new(preds[2].clone())).await.unwrap(); - - DbScenario { - scenario_name: "Deleted data from 1 RUB, 2 OS".into(), - db, - } -} - -async fn make_3os_deletes( - lp_data: &[Vec<&str>], - preds: &[Predicate], - table_name: &str, - partition_key: &str, -) -> DbScenario { - let db = make_db().await.db; - - // All 3 chunks are OS - write_lp(&db, &lp_data[0].join("\n")).await; - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0)) - .await - .unwrap(); - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - db.unload_read_buffer(table_name, partition_key, ChunkId::new(1)) - .unwrap(); - - // Chunk 2 - write_lp(&db, &lp_data[1].join("\n")).await; - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(2)) - .await - .unwrap(); - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - db.unload_read_buffer(table_name, partition_key, ChunkId::new(3)) - .unwrap(); - - // Chunk 3 - write_lp(&db, &lp_data[2].join("\n")).await; - db.rollover_partition(table_name, partition_key) - .await - .unwrap(); - db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(4)) - .await - .unwrap(); - db.persist_partition( - table_name, - partition_key, - Instant::now() + Duration::from_secs(1), - ) - .await - .unwrap(); - db.unload_read_buffer(table_name, partition_key, ChunkId::new(5)) - .unwrap(); - - // 3 OS - assert_eq!(count_mutable_buffer_chunks(&db), 0); - assert_eq!(count_read_buffer_chunks(&db), 0); - assert_eq!(count_object_store_chunks(&db), 3); - - // Let issue 3 deletes - db.delete("cpu", Arc::new(preds[0].clone())).await.unwrap(); - db.delete("cpu", Arc::new(preds[1].clone())).await.unwrap(); - db.delete("cpu", Arc::new(preds[2].clone())).await.unwrap(); - - DbScenario { - scenario_name: "Deleted data from 3 OS".into(), - db, - } + // return scenarios to run queries + vec![scenario_os] } diff --git a/query_tests/src/sql.rs b/query_tests/src/sql.rs index baab295f1f..d32973403a 100644 --- a/query_tests/src/sql.rs +++ b/query_tests/src/sql.rs @@ -845,7 +845,7 @@ async fn sql_select_all_different_tags_chunks() { } #[tokio::test] -async fn sql_select_with_deleted_data_from_one_expr() { +async fn sql_select_with_delete_from_one_expr() { let expected = vec![ "+-----+--------------------------------+", "| bar | time |", @@ -880,7 +880,7 @@ async fn sql_select_with_deleted_data_from_one_expr() { } #[tokio::test] -async fn sql_select_with_deleted_data_from_multi_exprs() { +async fn sql_select_with_delete_from_multi_exprs() { let expected = vec![ "+-----+-----+--------------------------------+", "| bar | foo | time |", @@ -916,7 +916,7 @@ async fn sql_select_with_deleted_data_from_multi_exprs() { } #[tokio::test] -async fn sql_select_with_two_deleted_data_from_multi_exprs() { +async fn sql_select_with_two_deletes_from_multi_exprs() { let expected = vec![ "+-----+-----+--------------------------------+", "| bar | foo | time |",