Merge pull request #2601 from influxdata/ntran/delete_more_tests

fix: All chunks now are applied delete predicates during scan
pull/24376/head
kodiakhq[bot] 2021-09-21 18:57:10 +00:00 committed by GitHub
commit 4b0d6a8b08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1368 additions and 710 deletions

View File

@ -187,26 +187,24 @@ impl Predicate {
val
}
/// Merge the given delete predicates into this select predicate
/// Merge the given delete predicates into this select predicate.
/// Since we want to eliminate data filtered by the delete predicates,
/// they are first converted into their negated form: NOT(delete_predicate)
/// then added/merged into the selection one
pub fn merge_delete_predicates<S>(&mut self, delete_predicates: &[S])
where
S: AsRef<Self>,
{
self.add_negated_delete_exprs(delete_predicates);
}
// Create a list of disjunctive negated expressions.
// Example: there are two deletes as follows (note that time_range is stored separated in the Predicate
// but we need to put it together with the exprs here)
// . Delete_1: WHERE city != "Boston" AND temp = 70 AND time_range in [10, 30)
// . Delete 2: WHERE state = "NY" AND route != "I90" AND time_range in [20, 50)
// The negated list will be "NOT(Delete_1)", NOT(Delete_2)" which means
// NOT(city != "Boston" AND temp = 70 AND time_range in [10, 30)), NOT(state = "NY" AND route != "I90" AND time_range in [20, 50)) which means
// [NOT(city = Boston") OR NOT(temp = 70) OR NOT(time_range in [10, 30))], [NOT(state = "NY") OR NOT(route != "I90") OR NOT(time_range in [20, 50))]
// Note that the "NOT(time_range in [20, 50))]" or "NOT(20 <= time < 50)"" is replaced with "time < 20 OR time >= 50"
/// Add a list of disjunctive negated expressions.
/// Example: there are two deletes as follows (note that time_range is stored separated in the Predicate
/// but we need to put it together with the exprs hee)
/// . Delete_1: WHERE city != "Boston" AND temp = 70 AND time_range in [10, 30)
/// . Delete 2: WHERE state = "NY" AND route != "I90" AND time_range in [20, 50)
/// The negated list will be "NOT(Delete_1)", NOT(Delete_2)" which means
/// NOT(city != "Boston" AND temp = 70 AND time_range in [10, 30)), NOT(state = "NY" AND route != "I90" AND time_range in [20, 50)) which means
/// [NOT(city = Boston") OR NOT(temp = 70) OR NOT(time_range in [10, 30))], [NOT(state = "NY") OR NOT(route != "I90") OR NOT(time_range in [20, 50))]
fn add_negated_delete_exprs<S>(&mut self, delete_predicates: &[S])
where
S: AsRef<Self>,
{
for pred in delete_predicates {
let pred = pred.as_ref();
@ -237,6 +235,8 @@ impl Predicate {
Some(e) => expr = Some(e.or(exp.clone().not())),
}
}
// Push the negated expression of the delete predicate into the list exprs of the select predicate
if let Some(e) = expr {
self.exprs.push(e);
}

View File

@ -742,11 +742,11 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
/// ▲
/// │
/// │
/// ┌───────────────────────
/// │ FilterExec
/// | To apply delete preds
/// │ (Chunk)
/// └───────────────────────
/// ┌─────────────────────────
/// │ FilterExec (optional)
/// | To apply delete preds
/// │ (Chunk)
/// └─────────────────────────
/// ▲
/// │
/// │
@ -861,19 +861,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
}
/// Return the simplest IOx scan plan of a given chunk which is IOxReadFilterNode
/// ```text
/// ┌─────────────────┐
/// │ SortExec │
/// │ (optional) │ <-- Only added if the input output_sort_key is not empty
/// └─────────────────┘
/// ▲
/// │
/// │
/// ┌─────────────────┐
/// │IOxReadFilterNode│
/// │ (Chunk) │
/// └─────────────────┘
///```
// And some optional operators on top such as applying delete predicates or sort the chunk
fn build_plan_for_non_duplicates_chunk(
table_name: Arc<str>,
output_schema: Arc<Schema>,
@ -890,30 +878,35 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
)
}
/// Return either:
/// the simplest IOx scan plan for many chunks which is IOxReadFilterNode
/// if the input output_sort_key is empty
/// Return either
/// the simplest IOx scan plan for no chunks which is IOxReadFilterNode:
/// ```text
/// ┌─────────────────┐
/// │IOxReadFilterNode│
/// │ (Many Chunks)
/// │ (No Chunks)
/// └─────────────────┘
///```
///
/// Otherwise, many plans like this
///
/// ```
/// Or, many plans, one for each chunk, like this:
/// ```text
/// ┌─────────────────┐ ┌─────────────────┐
/// │ SortExec │ │ SortExec │
/// │ (optional) │ │ (optional) │
/// └─────────────────┘ └─────────────────┘
/// ▲ ▲
/// │ ..... │
/// │ │
/// ┌─────────────────┐ ┌─────────────────┐
/// │IOxReadFilterNode│ │IOxReadFilterNode│
/// │ (Chunk 1) │ │ (Chunk n) │
/// └─────────────────┘ └─────────────────┘
/// ┌─────────────────┐ ┌─────────────────┐
/// │ SortExec │ │ SortExec │
/// │ (optional) │ │ (optional) │
/// └─────────────────┘ └─────────────────┘
/// ▲ ▲
/// │ │
/// │
/// ┌─────────────────────────┐ ┌─────────────────────────┐
/// │ FilterExec (optional) │ │ FilterExec (optional) │
/// | To apply delete preds │ ..... | To apply delete preds │
/// │ (Chunk) │ │ (Chunk) │
/// └─────────────────────────┘ └─────────────────────────┘
/// ▲ ▲
/// │ │
/// │ │
/// ┌─────────────────┐ ┌─────────────────┐
/// │IOxReadFilterNode│ │IOxReadFilterNode│
/// │ (Chunk 1) │ │ (Chunk n) │
/// └─────────────────┘ └─────────────────┘
///```
fn build_plans_for_non_duplicates_chunks(
table_name: Arc<str>,
@ -924,8 +917,9 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
) -> Result<Vec<Arc<dyn ExecutionPlan>>> {
let mut plans: Vec<Arc<dyn ExecutionPlan>> = vec![];
// output is not required to be sorted or no chunks provided, only create a read filter for all chunks
if output_sort_key.is_empty() || chunks.is_empty() {
// Since now each chunk may include delete predicates, we need to create plan for each chunk but
// if there is no chunk, we still need to return a plan
if chunks.is_empty() {
plans.push(Arc::new(IOxReadFilterNode::new(
Arc::clone(&table_name),
output_schema,

View File

@ -1,14 +1,14 @@
//! This module contains testing scenarios for Db
pub mod delete;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use data_types::chunk_metadata::ChunkId;
use datafusion::logical_plan::{col, lit};
use once_cell::sync::OnceCell;
use predicate::predicate::{Predicate, PredicateBuilder};
use query::QueryChunk;
use async_trait::async_trait;
@ -1212,650 +1212,3 @@ impl DbSetup for ChunkOrder {
vec![scenario]
}
}
#[derive(Debug)]
/// Setup for delete query test with one table and one chunk moved from MUB to RUB to OS
pub struct DeleteFromMubOneMeasurementOneChunk {}
#[async_trait]
impl DbSetup for DeleteFromMubOneMeasurementOneChunk {
async fn make(&self) -> Vec<DbScenario> {
// The main purpose of these scenarios is the delete predicate is added in MUB and
// is moved with chunk moving
// General setup for all scenarios
let partition_key = "1970-01-01T00";
let table_name = "cpu";
// chunk data
let lp_lines = vec!["cpu bar=1 10", "cpu bar=2 20"];
// delete predicate
let i: f64 = 1.0;
let expr = col("bar").eq(lit(i));
let pred = PredicateBuilder::new()
.table("cpu")
.timestamp_range(0, 15)
.add_expr(expr)
.build();
// delete happens when data in MUB
let scenario_mub = make_delete_mub(lp_lines.clone(), pred.clone()).await;
// delete happens when data in MUB then moved to RUB
let scenario_rub =
make_delete_mub_to_rub(lp_lines.clone(), pred.clone(), table_name, partition_key).await;
// delete happens when data in MUB then moved to RUB and then persisted
let scenario_rub_os = make_delete_mub_to_rub_and_os(
lp_lines.clone(),
pred.clone(),
table_name,
partition_key,
)
.await;
// delete happens when data in MUB then moved to RUB, then persisted, and then RUB is unloaded
let scenario_os =
make_delete_mub_to_os(lp_lines.clone(), pred, table_name, partition_key).await;
// return scenarios to run queries
vec![scenario_mub, scenario_rub, scenario_rub_os, scenario_os]
}
}
#[derive(Debug)]
/// Setup for delete query test with one table and one chunk moved from RUB to OS
pub struct DeleteFromRubOneMeasurementOneChunk {}
#[async_trait]
impl DbSetup for DeleteFromRubOneMeasurementOneChunk {
async fn make(&self) -> Vec<DbScenario> {
// The main purpose of these scenarios is the delete predicate is added in RUB
// and is moved with chunk moving
// General setup for all scenarios
let partition_key = "1970-01-01T00";
let table_name = "cpu";
// chunk data
let lp_lines = vec!["cpu bar=1 10", "cpu bar=2 20"];
// delete predicate
let i: f64 = 1.0;
let expr = col("bar").eq(lit(i));
let pred = PredicateBuilder::new()
.table("cpu")
.timestamp_range(0, 15)
.add_expr(expr)
.build();
// delete happens to data in RUB
let scenario_rub =
make_delete_rub(lp_lines.clone(), pred.clone(), table_name, partition_key).await;
// delete happens to data in RUB then persisted
let scenario_rub_os =
make_delete_rub_to_os(lp_lines.clone(), pred.clone(), table_name, partition_key).await;
// delete happens to data in RUB then persisted then RUB unloaded
let scenario_os =
make_delete_rub_to_os_and_unload_rub(lp_lines.clone(), pred, table_name, partition_key)
.await;
// return scenarios to run queries
vec![scenario_rub, scenario_rub_os, scenario_os]
}
}
#[derive(Debug)]
/// Setup for delete query test with one table and one chunk in both RUB and OS
pub struct DeleteFromOsOneMeasurementOneChunk {}
#[async_trait]
impl DbSetup for DeleteFromOsOneMeasurementOneChunk {
async fn make(&self) -> Vec<DbScenario> {
// The main purpose of these scenarios is the delete predicate is added to persisted chunks
// General setup for all scenarios
let partition_key = "1970-01-01T00";
let table_name = "cpu";
// chunk data
let lp_lines = vec!["cpu bar=1 10", "cpu bar=2 20"];
// delete predicate
let i: f64 = 1.0;
let expr = col("bar").eq(lit(i));
let pred = PredicateBuilder::new()
.table("cpu")
.timestamp_range(0, 15)
.add_expr(expr)
.build();
// delete happens after data is persisted but still in RUB
let scenario_rub_os =
make_delete_os_with_rub(lp_lines.clone(), pred.clone(), table_name, partition_key)
.await;
// delete happens after data is persisted but still in RUB and then unload RUB
let _scenario_rub_os_unload_rub = make_delete_os_with_rub_then_unload_rub(
lp_lines.clone(),
pred.clone(),
table_name,
partition_key,
)
.await;
// delete happens after data is persisted and RUB is unloaded
let _scenario_os = make_delete_os(lp_lines.clone(), pred, table_name, partition_key).await;
// return scenarios to run queries
// NGA todo: turn these 2 OS scenarios on. May need to wait for Marco to finish persisting delete predicates first
// vec![scenario_rub_os, scenario_rub_os_unload_rub, scenario_os]
vec![scenario_rub_os]
}
}
#[derive(Debug)]
/// Setup for multi-expression delete query test with one table and one chunk moved from MUB to RUB to OS
pub struct DeleteMultiExprsFromMubOneMeasurementOneChunk {}
#[async_trait]
impl DbSetup for DeleteMultiExprsFromMubOneMeasurementOneChunk {
async fn make(&self) -> Vec<DbScenario> {
// The main purpose of these scenarios is the multi-expression delete predicate is added in MUB and
// is moved with chunk moving
// General setup for all scenarios
let partition_key = "1970-01-01T00";
let table_name = "cpu";
// chunk data
let lp_lines = vec![
"cpu,foo=me bar=1 10",
"cpu,foo=you bar=2 20",
"cpu,foo=me bar=1 30",
"cpu,foo=me bar=1 40",
];
// delete predicate
let i: f64 = 1.0;
let expr1 = col("bar").eq(lit(i));
let expr2 = col("foo").eq(lit("me"));
let pred = PredicateBuilder::new()
.table("cpu")
.timestamp_range(0, 32)
.add_expr(expr1)
.add_expr(expr2)
.build();
// delete happens when data in MUB
let scenario_mub = make_delete_mub(lp_lines.clone(), pred.clone()).await;
// delete happens when data in MUB then moved to RUB
let scenario_rub =
make_delete_mub_to_rub(lp_lines.clone(), pred.clone(), table_name, partition_key).await;
// delete happens when data in MUB then moved to RUB and then persisted
let scenario_rub_os = make_delete_mub_to_rub_and_os(
lp_lines.clone(),
pred.clone(),
table_name,
partition_key,
)
.await;
// delete happens when data in MUB then moved to RUB, then persisted, and then RUB is unloaded
let scenario_os =
make_delete_mub_to_os(lp_lines.clone(), pred, table_name, partition_key).await;
// return scenarios to run queries
vec![scenario_mub, scenario_rub, scenario_rub_os, scenario_os]
}
}
#[derive(Debug)]
/// Setup for multi-expression delete query test with one table and one chunk moved from MUB to RUB to OS
pub struct DeleteMultiExprsFromRubOneMeasurementOneChunk {}
#[async_trait]
impl DbSetup for DeleteMultiExprsFromRubOneMeasurementOneChunk {
async fn make(&self) -> Vec<DbScenario> {
// The main purpose of these scenarios is the multi-expression delete predicate is added in MUB and
// is moved with chunk moving
// General setup for all scenarios
let partition_key = "1970-01-01T00";
let table_name = "cpu";
// chunk data
let lp_lines = vec![
"cpu,foo=me bar=1 10",
"cpu,foo=you bar=2 20",
"cpu,foo=me bar=1 30",
"cpu,foo=me bar=1 40",
];
// delete predicate
let i: f64 = 1.0;
let expr1 = col("bar").eq(lit(i));
let expr2 = col("foo").eq(lit("me"));
let pred = PredicateBuilder::new()
.table("cpu")
.timestamp_range(0, 32)
.add_expr(expr1)
.add_expr(expr2)
.build();
// delete happens to data in RUB
let scenario_rub =
make_delete_rub(lp_lines.clone(), pred.clone(), table_name, partition_key).await;
// delete happens to data in RUB then persisted
let scenario_rub_os =
make_delete_rub_to_os(lp_lines.clone(), pred.clone(), table_name, partition_key).await;
// delete happens to data in RUB then persisted then RUB unloaded
let scenario_os =
make_delete_rub_to_os_and_unload_rub(lp_lines.clone(), pred, table_name, partition_key)
.await;
// return scenarios to run queries
vec![scenario_rub, scenario_rub_os, scenario_os]
}
}
#[derive(Debug)]
/// Setup for multi-expression delete query test with one table and one chunk moved from MUB to RUB to OS
pub struct DeleteMultiExprsFromOsOneMeasurementOneChunk {}
#[async_trait]
impl DbSetup for DeleteMultiExprsFromOsOneMeasurementOneChunk {
async fn make(&self) -> Vec<DbScenario> {
// The main purpose of these scenarios is the multi-expression delete predicate is added in MUB and
// is moved with chunk moving
// General setup for all scenarios
let partition_key = "1970-01-01T00";
let table_name = "cpu";
// chunk data
let lp_lines = vec![
"cpu,foo=me bar=1 10",
"cpu,foo=you bar=2 20",
"cpu,foo=me bar=1 30",
"cpu,foo=me bar=1 40",
];
// delete predicate
let i: f64 = 1.0;
let expr1 = col("bar").eq(lit(i));
let expr2 = col("foo").eq(lit("me"));
let pred = PredicateBuilder::new()
.table("cpu")
.timestamp_range(0, 32)
.add_expr(expr1)
.add_expr(expr2)
.build();
// delete happens after data is persisted but still in RUB
let scenario_rub_os =
make_delete_os_with_rub(lp_lines.clone(), pred.clone(), table_name, partition_key)
.await;
// delete happens after data is persisted but still in RUB and then unload RUB
let _scenario_rub_os_unload_rub = make_delete_os_with_rub_then_unload_rub(
lp_lines.clone(),
pred.clone(),
table_name,
partition_key,
)
.await;
// delete happens after data is persisted and RUB is unloaded
let _scenario_os = make_delete_os(lp_lines.clone(), pred, table_name, partition_key).await;
// return scenarios to run queries
// NGA todo: turn these 2 OS scenarios on. May need to wait for Marco to finish persisting delete predicates first
//vec![scenario_rub_os, scenario_rub_os_unload_rub, scenario_os]
vec![scenario_rub_os]
}
}
// NGA todo next PR: Add these scenarios after deleted data is eliminated from scan
// 1. Many deletes, each has one or/and multi expressions
// 2. Many different-type chunks when a delete happens
// 3. Combination of above
async fn make_delete_mub(lp_lines: Vec<&str>, pred: Predicate) -> DbScenario {
let db = make_db().await.db;
// create an open MUB
write_lp(&db, &lp_lines.join("\n")).await;
// One open MUB, no RUB, no OS
assert_eq!(count_mutable_buffer_chunks(&db), 1);
assert_eq!(count_read_buffer_chunks(&db), 0);
assert_eq!(count_object_store_chunks(&db), 0);
db.delete("cpu", Arc::new(pred)).await.unwrap();
// Still one but frozen MUB, no RUB, no OS
assert_eq!(count_mutable_buffer_chunks(&db), 1);
assert_eq!(count_read_buffer_chunks(&db), 0);
assert_eq!(count_object_store_chunks(&db), 0);
DbScenario {
scenario_name: "Deleted data in MUB".into(),
db,
}
}
async fn make_delete_mub_to_rub(
lp_lines: Vec<&str>,
pred: Predicate,
table_name: &str,
partition_key: &str,
) -> DbScenario {
let db = make_db().await.db;
// create an open MUB
write_lp(&db, &lp_lines.join("\n")).await;
// delete data in MUB
db.delete("cpu", Arc::new(pred)).await.unwrap();
// move MUB to RUB and the delete predicate will be automatically included in RUB
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0))
.await
.unwrap();
// No MUB, one RUB, no OS
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 0);
DbScenario {
scenario_name: "Deleted data in RUB moved from MUB".into(),
db,
}
}
async fn make_delete_mub_to_rub_and_os(
lp_lines: Vec<&str>,
pred: Predicate,
table_name: &str,
partition_key: &str,
) -> DbScenario {
let db = make_db().await.db;
// create an open MUB
write_lp(&db, &lp_lines.join("\n")).await;
// delete data in MUB
db.delete("cpu", Arc::new(pred)).await.unwrap();
// move MUB to RUB and the delete predicate will be automatically included in RUB
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0))
.await
.unwrap();
// persist RUB and the delete predicate will be automatically included in the OS chunk
db.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
// No MUB, one RUB, one OS
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 1);
DbScenario {
scenario_name: "Deleted data in RUB and OS".into(),
db,
}
}
async fn make_delete_mub_to_os(
lp_lines: Vec<&str>,
pred: Predicate,
table_name: &str,
partition_key: &str,
) -> DbScenario {
let db = make_db().await.db;
// create an open MUB
write_lp(&db, &lp_lines.join("\n")).await;
// delete data in MUB
db.delete("cpu", Arc::new(pred)).await.unwrap();
// move MUB to RUB and the delete predicate will be automatically included in RUB
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0))
.await
.unwrap();
// persist RUB and the delete predicate will be automatically included in the OS chunk
db.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
// remove RUB
db.unload_read_buffer(table_name, partition_key, ChunkId::new(1))
.unwrap();
// No MUB, no RUB, one OS
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 0);
assert_eq!(count_object_store_chunks(&db), 1);
DbScenario {
scenario_name: "Deleted data in OS".into(),
db,
}
}
async fn make_delete_rub(
lp_lines: Vec<&str>,
pred: Predicate,
table_name: &str,
partition_key: &str,
) -> DbScenario {
let db = make_db().await.db;
// create an open MUB
write_lp(&db, &lp_lines.join("\n")).await;
// move MUB to RUB
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0))
.await
.unwrap();
// delete data in RUB
db.delete("cpu", Arc::new(pred)).await.unwrap();
// No MUB, one RUB, no OS
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 0);
DbScenario {
scenario_name: "Deleted data in RUB".into(),
db,
}
}
async fn make_delete_rub_to_os(
lp_lines: Vec<&str>,
pred: Predicate,
table_name: &str,
partition_key: &str,
) -> DbScenario {
let db = make_db().await.db;
// create an open MUB
write_lp(&db, &lp_lines.join("\n")).await;
// move MUB to RUB
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0))
.await
.unwrap();
// delete data in RUB
db.delete("cpu", Arc::new(pred)).await.unwrap();
// persist RUB and the delete predicate will be automatically included in the OS chunk
db.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
// No MUB, one RUB, one OS
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 1);
DbScenario {
scenario_name: "Deleted data in RUB and then persisted to OS".into(),
db,
}
}
async fn make_delete_rub_to_os_and_unload_rub(
lp_lines: Vec<&str>,
pred: Predicate,
table_name: &str,
partition_key: &str,
) -> DbScenario {
let db = make_db().await.db;
// create an open MUB
write_lp(&db, &lp_lines.join("\n")).await;
// move MUB to RUB
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0))
.await
.unwrap();
// delete data in RUB
db.delete("cpu", Arc::new(pred)).await.unwrap();
// persist RUB and the delete predicate will be automatically included in the OS chunk
db.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
// remove RUB
db.unload_read_buffer(table_name, partition_key, ChunkId::new(1))
.unwrap();
// No MUB, no RUB, one OS
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 0);
assert_eq!(count_object_store_chunks(&db), 1);
DbScenario {
scenario_name: "Deleted data in RUB then persisted to OS then RUB unloaded".into(),
db,
}
}
async fn make_delete_os_with_rub(
lp_lines: Vec<&str>,
pred: Predicate,
table_name: &str,
partition_key: &str,
) -> DbScenario {
let db = make_db().await.db;
// create an open MUB
write_lp(&db, &lp_lines.join("\n")).await;
// move MUB to RUB
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0))
.await
.unwrap();
// persist RUB and the delete predicate will be automatically included in the OS chunk
db.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
// delete data after persisted but RUB still available
db.delete("cpu", Arc::new(pred)).await.unwrap();
// No MUB, one RUB, one OS
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 1);
DbScenario {
scenario_name: "Deleted data in OS with RUB".into(),
db,
}
}
async fn make_delete_os_with_rub_then_unload_rub(
lp_lines: Vec<&str>,
pred: Predicate,
table_name: &str,
partition_key: &str,
) -> DbScenario {
let db = make_db().await.db;
// create an open MUB
write_lp(&db, &lp_lines.join("\n")).await;
// move MUB to RUB
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0))
.await
.unwrap();
// persist RUB and the delete predicate will be automatically included in the OS chunk
db.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
// delete data after persisted but RUB still available
db.delete("cpu", Arc::new(pred)).await.unwrap();
// remove RUB
db.unload_read_buffer(table_name, partition_key, ChunkId::new(1))
.unwrap();
// No MUB, no RUB, one OS
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 0);
assert_eq!(count_object_store_chunks(&db), 1);
DbScenario {
scenario_name: "Deleted data in OS only but the delete happens before RUB is unloaded"
.into(),
db,
}
}
async fn make_delete_os(
lp_lines: Vec<&str>,
pred: Predicate,
table_name: &str,
partition_key: &str,
) -> DbScenario {
let db = make_db().await.db;
// create an open MUB
write_lp(&db, &lp_lines.join("\n")).await;
// move MUB to RUB
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
db.move_chunk_to_read_buffer(table_name, partition_key, ChunkId::new(0))
.await
.unwrap();
// persist RUB and the delete predicate will be automatically included in the OS chunk
db.persist_partition(
table_name,
partition_key,
Instant::now() + Duration::from_secs(1),
)
.await
.unwrap();
// remove RUB
db.unload_read_buffer(table_name, partition_key, ChunkId::new(1))
.unwrap();
// delete data after persisted but RUB still available
db.delete("cpu", Arc::new(pred)).await.unwrap();
// No MUB, no RUB, one OS
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 0);
assert_eq!(count_object_store_chunks(&db), 1);
DbScenario {
scenario_name: "Deleted data in OS and the delete happens after RUB is unloaded".into(),
db,
}
}

File diff suppressed because it is too large Load Diff

View File

@ -3,6 +3,8 @@
//! wired all the pieces together (as well as ensure any particularly
//! important SQL does not regress)
use crate::scenarios;
use super::scenarios::*;
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_sorted_eq;
@ -823,21 +825,21 @@ async fn sql_select_with_deleted_data_from_one_expr() {
// Data deleted when it is in MUB, and then moved to RUB and OS
run_sql_test_case!(
DeleteFromMubOneMeasurementOneChunk {},
scenarios::delete::DeleteFromMubOneMeasurementOneChunk {},
"SELECT * from cpu",
&expected
);
// Data deleted when it is in RUB, and then moved OS
run_sql_test_case!(
DeleteFromRubOneMeasurementOneChunk {},
scenarios::delete::DeleteFromRubOneMeasurementOneChunk {},
"SELECT * from cpu",
&expected
);
// Data deleted when it is in OS
run_sql_test_case!(
DeleteFromOsOneMeasurementOneChunk {},
scenarios::delete::DeleteFromOsOneMeasurementOneChunk {},
"SELECT * from cpu",
&expected
);
@ -856,21 +858,53 @@ async fn sql_select_with_deleted_data_from_multi_exprs() {
// Data deleted when it is in MUB, and then moved to RUB and OS
run_sql_test_case!(
DeleteMultiExprsFromMubOneMeasurementOneChunk {},
scenarios::delete::DeleteMultiExprsFromMubOneMeasurementOneChunk {},
"SELECT * from cpu",
&expected
);
// Data deleted when it is in RUB, and then moved OS
run_sql_test_case!(
DeleteMultiExprsFromRubOneMeasurementOneChunk {},
scenarios::delete::DeleteMultiExprsFromRubOneMeasurementOneChunk {},
"SELECT * from cpu",
&expected
);
// Data deleted when it is in OS
run_sql_test_case!(
DeleteMultiExprsFromOsOneMeasurementOneChunk {},
scenarios::delete::DeleteMultiExprsFromOsOneMeasurementOneChunk {},
"SELECT * from cpu",
&expected
);
}
#[tokio::test]
async fn sql_select_with_two_deleted_data_from_multi_exprs() {
let expected = vec![
"+-----+-----+--------------------------------+",
"| bar | foo | time |",
"+-----+-----+--------------------------------+",
"| 1 | me | 1970-01-01T00:00:00.000000040Z |",
"+-----+-----+--------------------------------+",
];
// Data deleted when it is in MUB, and then moved to RUB and OS
run_sql_test_case!(
scenarios::delete::TwoDeleteMultiExprsFromMubOneMeasurementOneChunk {},
"SELECT * from cpu",
&expected
);
// Data deleted when it is in RUB, and then moved OS
run_sql_test_case!(
scenarios::delete::TwoDeleteMultiExprsFromRubOneMeasurementOneChunk {},
"SELECT * from cpu",
&expected
);
// Data deleted when it is in OS
run_sql_test_case!(
scenarios::delete::TwoDeleteMultiExprsFromOsOneMeasurementOneChunk {},
"SELECT * from cpu",
&expected
);