fix: all chunks now are applied delete predicates during scan

pull/24376/head
Nga Tran 2021-09-21 12:17:59 -04:00
parent 85989cc8a3
commit 93551bdd1e
2 changed files with 89 additions and 74 deletions

View File

@ -742,11 +742,11 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
/// ▲
/// │
/// │
/// ┌───────────────────────
/// │ FilterExec
/// | To apply delete preds
/// │ (Chunk)
/// └───────────────────────
/// ┌─────────────────────────
/// │ FilterExec (optional)
/// | To apply delete preds
/// │ (Chunk)
/// └─────────────────────────
/// ▲
/// │
/// │
@ -861,19 +861,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
}
/// Return the simplest IOx scan plan of a given chunk which is IOxReadFilterNode
/// ```text
/// ┌─────────────────┐
/// │ SortExec │
/// │ (optional) │ <-- Only added if the input output_sort_key is not empty
/// └─────────────────┘
/// ▲
/// │
/// │
/// ┌─────────────────┐
/// │IOxReadFilterNode│
/// │ (Chunk) │
/// └─────────────────┘
///```
// And some optional operators on top such as applying delete predicates or sort the chunk
fn build_plan_for_non_duplicates_chunk(
table_name: Arc<str>,
output_schema: Arc<Schema>,
@ -890,30 +878,28 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
)
}
/// Return either:
/// the simplest IOx scan plan for many chunks which is IOxReadFilterNode
/// if the input output_sort_key is empty
/// ```text
/// ┌─────────────────┐
/// │IOxReadFilterNode│
/// │ (Many Chunks) │
/// └─────────────────┘
///```
///
/// Otherwise, many plans like this
/// Return
///
/// ```text
/// ┌─────────────────┐ ┌─────────────────┐
/// │ SortExec │ │ SortExec │
/// │ (optional) │ │ (optional) │
/// └─────────────────┘ └─────────────────┘
/// ▲ ▲
/// │ ..... │
/// │ │
/// ┌─────────────────┐ ┌─────────────────┐
/// │IOxReadFilterNode│ │IOxReadFilterNode│
/// │ (Chunk 1) │ │ (Chunk n) │
/// └─────────────────┘ └─────────────────┘
/// ┌─────────────────┐ ┌─────────────────┐
/// │ SortExec │ │ SortExec │
/// │ (optional) │ │ (optional) │
/// └─────────────────┘ └─────────────────┘
/// ▲ ▲
/// │ │
/// │
/// ┌─────────────────────────┐ ┌─────────────────────────┐
/// │ FilterExec (optional) │ │ FilterExec (optional) │
/// | To apply delete preds │ ..... | To apply delete preds │
/// │ (Chunk) │ │ (Chunk) │
/// └─────────────────────────┘ └─────────────────────────┘
/// ▲ ▲
/// │ │
/// │ │
/// ┌─────────────────┐ ┌─────────────────┐
/// │IOxReadFilterNode│ │IOxReadFilterNode│
/// │ (Chunk 1) │ │ (Chunk n) │
/// └─────────────────┘ └─────────────────┘
///```
fn build_plans_for_non_duplicates_chunks(
table_name: Arc<str>,
@ -924,8 +910,9 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
) -> Result<Vec<Arc<dyn ExecutionPlan>>> {
let mut plans: Vec<Arc<dyn ExecutionPlan>> = vec![];
// output is not required to be sorted or no chunks provided, only create a read filter for all chunks
if output_sort_key.is_empty() || chunks.is_empty() {
// Since now each chunk may include delete predicates, we need to create plan for each chunk but
// if there is no chunk, we still need to return a plan
if chunks.is_empty() {
plans.push(Arc::new(IOxReadFilterNode::new(
Arc::clone(&table_name),
output_schema,

View File

@ -1505,11 +1505,6 @@ impl DbSetup for DeleteMultiExprsFromOsOneMeasurementOneChunk {
}
}
// NGA todo next PR: Add these scenarios after deleted data is eliminated from scan
// 1. Many deletes, each has one or/and multi expressions
// 2. Many different-type chunks when a delete happens
// 3. Combination of above
#[derive(Debug)]
/// Setup for multi-expression delete query test with one table and one chunk moved from MUB to RUB to OS
/// Two deletes at different chunk stages
@ -1549,11 +1544,18 @@ impl DbSetup for TwoDeleteMultiExprsFromMubOneMeasurementOneChunk {
.build();
// delete happens when data in MUB
let scenario_mub = make_delete_mub_delete(lp_lines.clone(), pred1.clone(), pred2.clone()).await;
let scenario_mub =
make_delete_mub_delete(lp_lines.clone(), pred1.clone(), pred2.clone()).await;
// delete happens when data in MUB then moved to RUB
let scenario_rub =
make_delete_mub_to_rub_delete(lp_lines.clone(), pred1.clone(), pred2.clone(), table_name, partition_key).await;
let scenario_rub = make_delete_mub_to_rub_delete(
lp_lines.clone(),
pred1.clone(),
pred2.clone(),
table_name,
partition_key,
)
.await;
// delete happens when data in MUB then moved to RUB and then persisted
let scenario_rub_os = make_delete_mub_to_rub_and_os_delete(
@ -1567,7 +1569,8 @@ impl DbSetup for TwoDeleteMultiExprsFromMubOneMeasurementOneChunk {
// delete happens when data in MUB then moved to RUB, then persisted, and then RUB is unloaded
let _scenario_os =
make_delete_mub_to_os_delete(lp_lines.clone(), pred1, pred2, table_name, partition_key).await;
make_delete_mub_to_os_delete(lp_lines.clone(), pred1, pred2, table_name, partition_key)
.await;
// return scenarios to run queries
// NGA todo: BUG in scenario_os
@ -1615,11 +1618,24 @@ impl DbSetup for TwoDeleteMultiExprsFromRubOneMeasurementOneChunk {
.build();
// delete happens when data in MUB
let scenario_rub = make_delete_rub_delete(lp_lines.clone(), pred1.clone(), pred2.clone(), table_name, partition_key).await;
let scenario_rub = make_delete_rub_delete(
lp_lines.clone(),
pred1.clone(),
pred2.clone(),
table_name,
partition_key,
)
.await;
// delete happens when data in MUB then moved to RUB
let scenario_rub_delete =
make_delete_rub_to_os_delete(lp_lines.clone(), pred1.clone(), pred2.clone(), table_name, partition_key).await;
let scenario_rub_delete = make_delete_rub_to_os_delete(
lp_lines.clone(),
pred1.clone(),
pred2.clone(),
table_name,
partition_key,
)
.await;
// delete happens when data in MUB then moved to RUB and then persisted
let _scenario_rub_os = make_delete_rub_to_os_and_unload_rub_delete(
@ -1638,8 +1654,6 @@ impl DbSetup for TwoDeleteMultiExprsFromRubOneMeasurementOneChunk {
}
}
#[derive(Debug)]
/// Setup for multi-expression delete query test with one table and one chunk in OS
pub struct TwoDeleteMultiExprsFromOsOneMeasurementOneChunk {}
@ -1676,10 +1690,15 @@ impl DbSetup for TwoDeleteMultiExprsFromOsOneMeasurementOneChunk {
.add_expr(expr3)
.build();
// delete happens after data is persisted but still in RUB
let scenario_rub_os =
make_delete_os_with_rub_delete(lp_lines.clone(), pred1.clone(), pred2.clone(), table_name, partition_key)
.await;
// delete happens after data is persisted but still in RUB
let scenario_rub_os = make_delete_os_with_rub_delete(
lp_lines.clone(),
pred1.clone(),
pred2.clone(),
table_name,
partition_key,
)
.await;
// delete happens after data is persisted but still in RUB and then unload RUB
let _scenario_rub_os_unload_rub = make_delete_os_with_rub_then_unload_rub_delete(
@ -1691,8 +1710,8 @@ impl DbSetup for TwoDeleteMultiExprsFromOsOneMeasurementOneChunk {
)
.await;
// delete happens after data is persisted and unload RUB
let _scenario_os = make_delete_os_delete(
// delete happens after data is persisted and unload RUB
let _scenario_os = make_delete_os_delete(
lp_lines.clone(),
pred1.clone(),
pred2.clone(),
@ -1704,13 +1723,13 @@ impl DbSetup for TwoDeleteMultiExprsFromOsOneMeasurementOneChunk {
// return scenarios to run queries
// NGA todo: bug scenario_rub_os_unload_rub, scenario_os
vec![scenario_rub_os]
}
}
// NGA todo: Add scenarios that have many different types of chunks at the same time (MUB, RUB, OS) when a or several deletes happen
// -----------------------------------------------------------------------------
// Helper functions
// Helper functions
async fn make_delete_mub(lp_lines: Vec<&str>, pred: Predicate) -> DbScenario {
let db = make_db().await.db;
// create an open MUB
@ -1731,7 +1750,11 @@ async fn make_delete_mub(lp_lines: Vec<&str>, pred: Predicate) -> DbScenario {
}
}
async fn make_delete_mub_delete(lp_lines: Vec<&str>, pred1: Predicate, pred2: Predicate) -> DbScenario {
async fn make_delete_mub_delete(
lp_lines: Vec<&str>,
pred1: Predicate,
pred2: Predicate,
) -> DbScenario {
let db = make_db().await.db;
// create an open MUB
write_lp(&db, &lp_lines.join("\n")).await;
@ -1753,7 +1776,8 @@ async fn make_delete_mub_delete(lp_lines: Vec<&str>, pred1: Predicate, pred2: Pr
assert_eq!(count_object_store_chunks(&db), 0);
DbScenario {
scenario_name: "Deleted data from MUB then move and then delete data from frozen MUB".into(),
scenario_name: "Deleted data from MUB then move and then delete data from frozen MUB"
.into(),
db,
}
}
@ -1814,7 +1838,8 @@ async fn make_delete_mub_to_rub_delete(
assert_eq!(count_object_store_chunks(&db), 0);
DbScenario {
scenario_name: "Deleted data from MUB, then move to RUB, then delete data from RUB again".into(),
scenario_name: "Deleted data from MUB, then move to RUB, then delete data from RUB again"
.into(),
db,
}
}
@ -1891,7 +1916,9 @@ async fn make_delete_mub_to_rub_and_os_delete(
assert_eq!(count_object_store_chunks(&db), 1);
DbScenario {
scenario_name: "Deleted data from MUB then move to RUB and OS, then delete from RUB and OS again".into(),
scenario_name:
"Deleted data from MUB then move to RUB and OS, then delete from RUB and OS again"
.into(),
db,
}
}
@ -1967,7 +1994,7 @@ async fn make_delete_mub_to_os_delete(
db.unload_read_buffer(table_name, partition_key, ChunkId::new(1))
.unwrap();
// delete data from OS after RUb is unloaded
db.delete("cpu", Arc::new(pred2)).await.unwrap();
db.delete("cpu", Arc::new(pred2)).await.unwrap();
// No MUB, no RUB, one OS
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 0);
@ -2112,7 +2139,9 @@ async fn make_delete_rub_to_os_delete(
assert_eq!(count_object_store_chunks(&db), 1);
DbScenario {
scenario_name: "Deleted data in RUB and then persisted to OS then delete once more from RUB and OS".into(),
scenario_name:
"Deleted data in RUB and then persisted to OS then delete once more from RUB and OS"
.into(),
db,
}
}
@ -2195,7 +2224,9 @@ async fn make_delete_rub_to_os_and_unload_rub_delete(
assert_eq!(count_object_store_chunks(&db), 1);
DbScenario {
scenario_name: "Deleted data in RUB then persisted to OS then RUB unloaded then delete data from OS".into(),
scenario_name:
"Deleted data in RUB then persisted to OS then RUB unloaded then delete data from OS"
.into(),
db,
}
}
@ -2394,7 +2425,6 @@ async fn make_delete_os(
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 0);
assert_eq!(count_object_store_chunks(&db), 1);
DbScenario {
scenario_name: "Deleted data in OS and the delete happens after RUB is unloaded".into(),
@ -2402,7 +2432,6 @@ async fn make_delete_os(
}
}
async fn make_delete_os_delete(
lp_lines: Vec<&str>,
pred1: Predicate,
@ -2438,7 +2467,6 @@ async fn make_delete_os_delete(
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 0);
assert_eq!(count_object_store_chunks(&db), 1);
DbScenario {
scenario_name: "Deleted data in OS and the two delete happens after RUB is unloaded".into(),