test: more delete tests

pull/24376/head
Nga Tran 2021-09-22 16:38:27 -04:00
parent ae40d93af4
commit 400ec93498
4 changed files with 130 additions and 207 deletions

View File

@ -142,6 +142,9 @@ jobs:
# "1" means line tables only, which is useful for panic tracebacks.
RUSTFLAGS: "-C debuginfo=1"
RUST_BACKTRACE: "1"
# set min stack size as a workaround to avoid stack overflow bug in DF
# https://github.com/apache/arrow-datafusion/issues/419
RUST_MIN_STACK: "10485760"
steps:
- checkout
- rust_components

View File

@ -525,7 +525,6 @@ impl DbSetup for TwoDeleteMultiExprsFromOsOneMeasurementOneChunk {
pub struct ThreeDeleteThreeChunks {}
#[async_trait]
impl DbSetup for ThreeDeleteThreeChunks {
async fn make(&self) -> Vec<DbScenario> {
// General setup for all scenarios
let partition_key = "1970-01-01T00";
@ -533,14 +532,14 @@ impl DbSetup for ThreeDeleteThreeChunks {
// chunk1 data
let lp_lines_1 = vec![
"cpu,foo=me bar=1 10",
"cpu,foo=you bar=2 20",
"cpu,foo=me bar=1 30",
"cpu,foo=me bar=1 10", // deleted by pred1
"cpu,foo=you bar=2 20", // deleted by pred2
"cpu,foo=me bar=1 30", // deleted by pred1
"cpu,foo=me bar=1 40",
];
// delete predicate on chunk 1
let i: f64 = 1.0;
let expr1 = col("bar").eq(lit(i));
//let i: f64 = 1.0;
let expr1 = col("bar").eq(lit(1f64));
let expr2 = col("foo").eq(lit("me"));
let pred1 = PredicateBuilder::new()
.table("cpu")
@ -552,7 +551,7 @@ impl DbSetup for ThreeDeleteThreeChunks {
//chunk 2 data
let lp_lines_2 = vec![
"cpu,foo=me bar=1 42",
"cpu,foo=you bar=3 42",
"cpu,foo=you bar=3 42", // deleted by pred2
"cpu,foo=me bar=4 50",
"cpu,foo=me bar=5 60",
];
@ -566,10 +565,10 @@ impl DbSetup for ThreeDeleteThreeChunks {
// chunk 3 data
let lp_lines_3 = vec![
"cpu,foo=me bar=1 60",
"cpu,foo=me bar=1 62",
"cpu,foo=you bar=3 70",
"cpu,foo=me bar=7 80",
"cpu,foo=me bar=8 90",
"cpu,foo=me bar=8 90", // deleted by pred3
];
// delete predicate on chunk 3
let i: f64 = 7.0;
@ -580,86 +579,41 @@ impl DbSetup for ThreeDeleteThreeChunks {
.add_expr(expr)
.build();
let lp_data = vec![lp_lines_1, lp_lines_2, lp_lines_3];
let preds = vec![pred1, pred2, pred3];
// 3 chunks: MUB, RUB, OS
let scenario_mub_rub_os = make_mub_rub_os_deletes(
lp_lines_1.clone(),
lp_lines_2.clone(),
lp_lines_3.clone(),
pred1.clone(),
pred2.clone(),
pred3.clone(),
table_name,
partition_key,
)
.await;
let scenario_mub_rub_os =
make_mub_rub_os_deletes(&lp_data, &preds, table_name, partition_key).await;
// 3 chunks: 2 MUB, 1 RUB
let scenario_2mub_rub = make_2mub_rub_deletes(
lp_lines_1.clone(),
lp_lines_2.clone(),
lp_lines_3.clone(),
pred1.clone(),
pred2.clone(),
pred3.clone(),
table_name,
partition_key,
)
.await;
let scenario_2mub_rub =
make_2mub_rub_deletes(&lp_data, &preds, table_name, partition_key).await;
// 3 chunks: 2 MUB, 1 OS
let scenario_2mub_os = make_2mub_os_deletes(
lp_lines_1.clone(),
lp_lines_2.clone(),
lp_lines_3.clone(),
pred1.clone(),
pred2.clone(),
pred3.clone(),
table_name,
partition_key,
)
.await;
// 3 chunks: 2 MUB, 1 OS
let scenario_2mub_os =
make_2mub_os_deletes(&lp_data, &preds, table_name, partition_key).await;
// 3 chunks: 2 RUB, 1 OS
let scenario_2rub_os = make_2rub_os_deletes(
lp_lines_1.clone(),
lp_lines_2.clone(),
lp_lines_3.clone(),
pred1.clone(),
pred2.clone(),
pred3.clone(),
table_name,
partition_key,
)
.await;
let scenario_2rub_os =
make_2rub_os_deletes(&lp_data, &preds, table_name, partition_key).await;
// 3 chunks: RUB, 2 OS
let scenario_rub_2os = make_rub_2os_deletes(
lp_lines_1.clone(),
lp_lines_2.clone(),
lp_lines_3.clone(),
pred1.clone(),
pred2.clone(),
pred3.clone(),
table_name,
partition_key,
)
.await;
let scenario_rub_2os =
make_rub_2os_deletes(&lp_data, &preds, table_name, partition_key).await;
// 3 chunks: 3 OS
let scenario_3os = make_3os_deletes(
lp_lines_1.clone(),
lp_lines_2.clone(),
lp_lines_3.clone(),
pred1.clone(),
pred2.clone(),
pred3.clone(),
table_name,
partition_key,
)
.await;
let scenario_3os = make_3os_deletes(&lp_data, &preds, table_name, partition_key).await;
// return scenarios to run queries
vec![scenario_mub_rub_os, scenario_2mub_rub, scenario_2mub_os, scenario_2rub_os, scenario_rub_2os, scenario_3os]
vec![
scenario_mub_rub_os,
scenario_2mub_rub,
scenario_2mub_os,
scenario_2rub_os,
scenario_rub_2os,
scenario_3os,
]
}
}
@ -1410,20 +1364,15 @@ async fn make_delete_os_delete(
}
async fn make_mub_rub_os_deletes(
lp_lines_1: Vec<&str>,
lp_lines_2: Vec<&str>,
lp_lines_3: Vec<&str>,
pred1: Predicate,
pred2: Predicate,
pred3: Predicate,
lp_data: &[Vec<&str>],
preds: &[Predicate],
table_name: &str,
partition_key: &str,
) -> DbScenario {
let db = make_db().await.db;
// chunk 1 is an OS chunk
write_lp(&db, &lp_lines_1.join("\n")).await;
write_lp(&db, &lp_data[0].join("\n")).await;
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
@ -1441,7 +1390,7 @@ async fn make_mub_rub_os_deletes(
.unwrap();
// Chunk 2 is a RUB
write_lp(&db, &lp_lines_2.join("\n")).await;
write_lp(&db, &lp_data[1].join("\n")).await;
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
@ -1450,7 +1399,7 @@ async fn make_mub_rub_os_deletes(
.unwrap();
// Chunk 3 is a MUB
write_lp(&db, &lp_lines_3.join("\n")).await;
write_lp(&db, &lp_data[2].join("\n")).await;
// 1 MUB, 1 RUB, 1 OS
assert_eq!(count_mutable_buffer_chunks(&db), 1);
@ -1458,9 +1407,9 @@ async fn make_mub_rub_os_deletes(
assert_eq!(count_object_store_chunks(&db), 1);
// Let issue 3 deletes
db.delete("cpu", Arc::new(pred1)).await.unwrap();
db.delete("cpu", Arc::new(pred2)).await.unwrap();
db.delete("cpu", Arc::new(pred3)).await.unwrap();
db.delete("cpu", Arc::new(preds[0].clone())).await.unwrap();
db.delete("cpu", Arc::new(preds[1].clone())).await.unwrap();
db.delete("cpu", Arc::new(preds[2].clone())).await.unwrap();
DbScenario {
scenario_name: "Deleted data from MUB, RUB, and OS".into(),
@ -1469,20 +1418,15 @@ async fn make_mub_rub_os_deletes(
}
async fn make_2mub_rub_deletes(
lp_lines_1: Vec<&str>,
lp_lines_2: Vec<&str>,
lp_lines_3: Vec<&str>,
pred1: Predicate,
pred2: Predicate,
pred3: Predicate,
lp_data: &[Vec<&str>],
preds: &[Predicate],
table_name: &str,
partition_key: &str,
) -> DbScenario {
let db = make_db().await.db;
// Chunk 1 is a RUB
write_lp(&db, &lp_lines_1.join("\n")).await;
write_lp(&db, &lp_data[0].join("\n")).await;
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
@ -1491,11 +1435,11 @@ async fn make_2mub_rub_deletes(
.unwrap();
// Chunk 2 is an frozen MUB and chunk 3 is an open MUB
write_lp(&db, &lp_lines_2.join("\n")).await;
write_lp(&db, &lp_data[1].join("\n")).await;
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
write_lp(&db, &lp_lines_3.join("\n")).await;
write_lp(&db, &lp_data[2].join("\n")).await;
// 1 MUB, 1 RUB, 1 OS
assert_eq!(count_mutable_buffer_chunks(&db), 2);
@ -1503,9 +1447,9 @@ async fn make_2mub_rub_deletes(
assert_eq!(count_object_store_chunks(&db), 0);
// Let issue 3 deletes
db.delete("cpu", Arc::new(pred1)).await.unwrap();
db.delete("cpu", Arc::new(pred2)).await.unwrap();
db.delete("cpu", Arc::new(pred3)).await.unwrap();
db.delete("cpu", Arc::new(preds[0].clone())).await.unwrap();
db.delete("cpu", Arc::new(preds[1].clone())).await.unwrap();
db.delete("cpu", Arc::new(preds[2].clone())).await.unwrap();
DbScenario {
scenario_name: "Deleted data from 2 MUB, 1 RUB".into(),
@ -1513,22 +1457,16 @@ async fn make_2mub_rub_deletes(
}
}
async fn make_2mub_os_deletes(
lp_lines_1: Vec<&str>,
lp_lines_2: Vec<&str>,
lp_lines_3: Vec<&str>,
pred1: Predicate,
pred2: Predicate,
pred3: Predicate,
lp_data: &[Vec<&str>],
preds: &[Predicate],
table_name: &str,
partition_key: &str,
) -> DbScenario {
let db = make_db().await.db;
// chunk 1 is an OS chunk
write_lp(&db, &lp_lines_1.join("\n")).await;
write_lp(&db, &lp_data[0].join("\n")).await;
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
@ -1545,22 +1483,22 @@ async fn make_2mub_os_deletes(
db.unload_read_buffer(table_name, partition_key, ChunkId::new(1))
.unwrap();
// Chunk 2 is an frozen MUB and chunk 3 is an open MUB
write_lp(&db, &lp_lines_2.join("\n")).await;
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
write_lp(&db, &lp_lines_3.join("\n")).await;
// Chunk 2 is an frozen MUB and chunk 3 is an open MUB
write_lp(&db, &lp_data[1].join("\n")).await;
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
write_lp(&db, &lp_data[2].join("\n")).await;
// 2 MUB, 1 OS
assert_eq!(count_mutable_buffer_chunks(&db), 2);
assert_eq!(count_read_buffer_chunks(&db), 0);
assert_eq!(count_object_store_chunks(&db), 1);
// 2 MUB, 1 OS
assert_eq!(count_mutable_buffer_chunks(&db), 2);
assert_eq!(count_read_buffer_chunks(&db), 0);
assert_eq!(count_object_store_chunks(&db), 1);
// Let issue 3 deletes
db.delete("cpu", Arc::new(pred1)).await.unwrap();
db.delete("cpu", Arc::new(pred2)).await.unwrap();
db.delete("cpu", Arc::new(pred3)).await.unwrap();
db.delete("cpu", Arc::new(preds[0].clone())).await.unwrap();
db.delete("cpu", Arc::new(preds[1].clone())).await.unwrap();
db.delete("cpu", Arc::new(preds[2].clone())).await.unwrap();
DbScenario {
scenario_name: "Deleted data from 2 MUB, 1 OS".into(),
@ -1568,22 +1506,16 @@ async fn make_2mub_os_deletes(
}
}
async fn make_2rub_os_deletes(
lp_lines_1: Vec<&str>,
lp_lines_2: Vec<&str>,
lp_lines_3: Vec<&str>,
pred1: Predicate,
pred2: Predicate,
pred3: Predicate,
lp_data: &[Vec<&str>],
preds: &[Predicate],
table_name: &str,
partition_key: &str,
) -> DbScenario {
let db = make_db().await.db;
// chunk 1 is an OS chunk
write_lp(&db, &lp_lines_1.join("\n")).await;
write_lp(&db, &lp_data[0].join("\n")).await;
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
@ -1600,8 +1532,8 @@ async fn make_2rub_os_deletes(
db.unload_read_buffer(table_name, partition_key, ChunkId::new(1))
.unwrap();
// Chunk 2 and 3 are RUBss
write_lp(&db, &lp_lines_2.join("\n")).await;
// Chunk 2 and 3 are RUBss
write_lp(&db, &lp_data[1].join("\n")).await;
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
@ -1609,7 +1541,7 @@ async fn make_2rub_os_deletes(
.await
.unwrap();
write_lp(&db, &lp_lines_3.join("\n")).await;
write_lp(&db, &lp_data[2].join("\n")).await;
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
@ -1617,15 +1549,15 @@ async fn make_2rub_os_deletes(
.await
.unwrap();
// 2 RUB, 1 OS
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 2);
assert_eq!(count_object_store_chunks(&db), 1);
// 2 RUB, 1 OS
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 2);
assert_eq!(count_object_store_chunks(&db), 1);
// Let issue 3 deletes
db.delete("cpu", Arc::new(pred1)).await.unwrap();
db.delete("cpu", Arc::new(pred2)).await.unwrap();
db.delete("cpu", Arc::new(pred3)).await.unwrap();
db.delete("cpu", Arc::new(preds[0].clone())).await.unwrap();
db.delete("cpu", Arc::new(preds[1].clone())).await.unwrap();
db.delete("cpu", Arc::new(preds[2].clone())).await.unwrap();
DbScenario {
scenario_name: "Deleted data from 2 RUB, 1 OS".into(),
@ -1633,22 +1565,16 @@ async fn make_2rub_os_deletes(
}
}
async fn make_rub_2os_deletes(
lp_lines_1: Vec<&str>,
lp_lines_2: Vec<&str>,
lp_lines_3: Vec<&str>,
pred1: Predicate,
pred2: Predicate,
pred3: Predicate,
lp_data: &[Vec<&str>],
preds: &[Predicate],
table_name: &str,
partition_key: &str,
) -> DbScenario {
let db = make_db().await.db;
// chunk 1 and 2 are OS
write_lp(&db, &lp_lines_1.join("\n")).await;
write_lp(&db, &lp_data[0].join("\n")).await;
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
@ -1665,8 +1591,8 @@ async fn make_rub_2os_deletes(
db.unload_read_buffer(table_name, partition_key, ChunkId::new(1))
.unwrap();
// Chunk 2
write_lp(&db, &lp_lines_2.join("\n")).await;
// Chunk 2
write_lp(&db, &lp_data[1].join("\n")).await;
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
@ -1684,7 +1610,7 @@ async fn make_rub_2os_deletes(
.unwrap();
// Chunk 3 are RUB
write_lp(&db, &lp_lines_3.join("\n")).await;
write_lp(&db, &lp_data[2].join("\n")).await;
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
@ -1692,15 +1618,15 @@ async fn make_rub_2os_deletes(
.await
.unwrap();
// 1 RUB, 2 OS
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 2);
// 1 RUB, 2 OS
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 2);
// Let issue 3 deletes
db.delete("cpu", Arc::new(pred1)).await.unwrap();
db.delete("cpu", Arc::new(pred2)).await.unwrap();
db.delete("cpu", Arc::new(pred3)).await.unwrap();
db.delete("cpu", Arc::new(preds[0].clone())).await.unwrap();
db.delete("cpu", Arc::new(preds[1].clone())).await.unwrap();
db.delete("cpu", Arc::new(preds[2].clone())).await.unwrap();
DbScenario {
scenario_name: "Deleted data from 1 RUB, 2 OS".into(),
@ -1709,20 +1635,15 @@ async fn make_rub_2os_deletes(
}
async fn make_3os_deletes(
lp_lines_1: Vec<&str>,
lp_lines_2: Vec<&str>,
lp_lines_3: Vec<&str>,
pred1: Predicate,
pred2: Predicate,
pred3: Predicate,
lp_data: &[Vec<&str>],
preds: &[Predicate],
table_name: &str,
partition_key: &str,
) -> DbScenario {
let db = make_db().await.db;
// All 3 chunks are OS
write_lp(&db, &lp_lines_1.join("\n")).await;
write_lp(&db, &lp_data[0].join("\n")).await;
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
@ -1739,8 +1660,8 @@ async fn make_3os_deletes(
db.unload_read_buffer(table_name, partition_key, ChunkId::new(1))
.unwrap();
// Chunk 2
write_lp(&db, &lp_lines_2.join("\n")).await;
// Chunk 2
write_lp(&db, &lp_data[1].join("\n")).await;
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
@ -1757,8 +1678,8 @@ async fn make_3os_deletes(
db.unload_read_buffer(table_name, partition_key, ChunkId::new(3))
.unwrap();
// Chunk 3
write_lp(&db, &lp_lines_3.join("\n")).await;
// Chunk 3
write_lp(&db, &lp_data[2].join("\n")).await;
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
@ -1775,15 +1696,15 @@ async fn make_3os_deletes(
db.unload_read_buffer(table_name, partition_key, ChunkId::new(5))
.unwrap();
// 3 OS
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 0);
assert_eq!(count_object_store_chunks(&db), 3);
// 3 OS
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 0);
assert_eq!(count_object_store_chunks(&db), 3);
// Let issue 3 deletes
db.delete("cpu", Arc::new(pred1)).await.unwrap();
db.delete("cpu", Arc::new(pred2)).await.unwrap();
db.delete("cpu", Arc::new(pred3)).await.unwrap();
db.delete("cpu", Arc::new(preds[0].clone())).await.unwrap();
db.delete("cpu", Arc::new(preds[1].clone())).await.unwrap();
db.delete("cpu", Arc::new(preds[2].clone())).await.unwrap();
DbScenario {
scenario_name: "Deleted data from 3 OS".into(),

View File

@ -28,6 +28,8 @@ macro_rules! run_sql_test_case {
let physical_plan = planner.query(&sql, &ctx).expect("built plan successfully");
//println!(" --- Physical plan: {:#?}", physical_plan);
let results: Vec<RecordBatch> = ctx.collect(physical_plan).await.expect("Running plan");
assert_batches_sorted_eq!($EXPECTED_LINES, &results);
}
@ -750,8 +752,10 @@ async fn sql_predicate_pushdown_correctness_13() {
#[tokio::test]
async fn sql_deduplicate_1() {
// This current expected is wrong because deduplicate is not available yet
// let sql =
// "select time, state, city, min_temp, max_temp, area from h2o order by time, state, city";
let sql =
"select time, state, city, min_temp, max_temp, area from h2o order by time, state, city";
"select time, state, city, min_temp, max_temp, area from h2o order by state, city, time";
let expected = vec![
"+--------------------------------+-------+---------+----------+----------+------+",
"| time | state | city | min_temp | max_temp | area |",
@ -911,29 +915,24 @@ async fn sql_select_with_two_deleted_data_from_multi_exprs() {
}
#[tokio::test]
async fn sql_select_with_three_deleted_data_from_three_chunks() {
async fn sql_select_with_three_deletes_from_three_chunks() {
let expected = vec![
"+-----+-----+--------------------------------+",
"| bar | foo | time |",
"+-----+-----+--------------------------------+",
"| 1 | me | 1970-01-01T00:00:00.000000040Z |",
"| 1 | me | 1970-01-01T00:00:00.000000042Z |",
"| 1 | me | 1970-01-01T00:00:00.000000062Z |",
"| 3 | you | 1970-01-01T00:00:00.000000070Z |",
"| 4 | me | 1970-01-01T00:00:00.000000050Z |",
"| 5 | me | 1970-01-01T00:00:00.000000060Z |",
"| 7 | me | 1970-01-01T00:00:00.000000080Z |",
"+-----+-----+--------------------------------+",
];
// Data deleted when it is in MUB, and then moved to RUB and OS
run_sql_test_case!(
scenarios::delete::ThreeDeleteThreeChunks {},
"SELECT * from cpu",
&expected
);
// cpu,foo=me bar=1 40
// "cpu,foo=me bar=1 42",
// "cpu,foo=me bar=4 50",
// "cpu,foo=me bar=5 60",
// "cpu,foo=me bar=1 60",
// "cpu,foo=you bar=3 70",
// "cpu,foo=me bar=8 90",
}

View File

@ -1556,24 +1556,24 @@ async fn test_delete() {
// Query cpu again with a selection predicate
let mut query_results = flight_client
.perform_query(db_name.clone(), r#"select * from cpu where cpu.region='west';"#)
.perform_query(
db_name.clone(),
r#"select * from cpu where cpu.region='west';"#,
)
.await
.unwrap();
let batches = query_results.to_batches().await.unwrap();
// result should be as above
assert_batches_sorted_eq!(&expected, &batches);
// Query cpu again with a differentselection predicate
let mut query_results = flight_client
.perform_query(db_name.clone(), "select * from cpu where user!=21")
.await
.unwrap();
// Query cpu again with a differentselection predicate
let mut query_results = flight_client
.perform_query(db_name.clone(), "select * from cpu where user!=21")
.await
.unwrap();
let batches = query_results.to_batches().await.unwrap();
// result should be nothing
let expected = [
"++",
"++",
];
let expected = ["++", "++"];
assert_batches_sorted_eq!(&expected, &batches);
// ------------------------------------------
@ -1588,7 +1588,7 @@ async fn test_delete() {
.delete(db_name.clone(), table, start, stop, pred)
.await;
assert!(del.is_err());
// Verify both existing tables still have the same data
// query to verify data deleted
// cpu
@ -1618,7 +1618,7 @@ async fn test_delete() {
"| 99 | east | 1970-01-01T00:00:00.000000200Z |",
"+-------+--------+--------------------------------+",
];
assert_batches_sorted_eq!(&disk_expected, &batches);
assert_batches_sorted_eq!(&disk_expected, &batches);
}
#[tokio::test]