refactor: query_tests - port a few OG to NG tests and remove many more that already ported (#4487)

* refactor: port a few OG to NG tests and remove many more that already ported

* chore: Apply suggestions from code review

* chore: address review comments

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Nga Tran 2022-05-02 10:44:02 -04:00 committed by GitHub
parent 0682051366
commit 799480d34e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 77 additions and 1492 deletions

View File

@ -1,7 +0,0 @@
-- Test Setup: ChunkOrder
-- SQL: SELECT * from cpu order by time;
+--------+--------------------------------+------+
| region | time | user |
+--------+--------------------------------+------+
| west | 1970-01-01T00:00:00.000000100Z | 2 |
+--------+--------------------------------+------+

View File

@ -1,5 +0,0 @@
-- Test that deduplication respects chunk ordering
-- IOX_SETUP: ChunkOrder
-- query data
SELECT * from cpu order by time;

View File

@ -1,4 +1,4 @@
-- Test Setup: OldOneMeasurementFourChunksWithDuplicates
-- Test Setup: OneMeasurementFourChunksWithDuplicates
-- SQL: explain select time, state, city, min_temp, max_temp, area from h2o order by time, state, city;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |

View File

@ -1,5 +1,5 @@
-- Test for predicate push down explains
-- IOX_SETUP: OldOneMeasurementFourChunksWithDuplicates
-- IOX_SETUP: OneMeasurementFourChunksWithDuplicates
-- Plan with order by
explain select time, state, city, min_temp, max_temp, area from h2o order by time, state, city;

View File

@ -1,13 +0,0 @@
-- Test Setup: OldTwoMeasurementsManyFieldsOneRubChunk
-- SQL: EXPLAIN SELECT count(*) from h2o;
+---------------+-------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------+
| logical_plan | Projection: #COUNT(UInt8(1)) |
| | Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] |
| | TableScan: h2o projection=Some([0]) |
| physical_plan | ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))] |
| | ProjectionExec: expr=[3 as COUNT(UInt8(1))] |
| | EmptyExec: produce_one_row=true |
| | |
+---------------+-------------------------------------------------------------+

View File

@ -1,10 +0,0 @@
-- Demonstrate plans that are not optimized using statistics
-- IOX_SETUP: OldTwoMeasurementsManyFieldsOneRubChunk
-- This plan should scan data as it reads from chunk with delete predicates
EXPLAIN SELECT count(*) from h2o;
-- NOTE: This test should have "IOX_SETUP: ThreeDeleteThreeChunks" but becasue of Bug: https://github.com/influxdata/influxdb_iox/issues/2745
-- make it OldTwoMeasurementsManyFieldsOneRubChunk
-- Also, the query should be "EXPLAIN SELECT count(*) from cpu;"

View File

@ -1,37 +0,0 @@
-- Test Setup: OldTwoMeasurementsManyFieldsOneRubChunk
-- SQL: SELECT * from information_schema.tables where table_schema = 'system';
-- Results After Sorting
+---------------+--------------+---------------------+------------+
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------+---------------------+------------+
| public | system | chunk_columns | BASE TABLE |
| public | system | chunks | BASE TABLE |
| public | system | columns | BASE TABLE |
| public | system | operations | BASE TABLE |
| public | system | persistence_windows | BASE TABLE |
| public | system | queries | BASE TABLE |
+---------------+--------------+---------------------+------------+
-- SQL: SELECT partition_key, table_name, storage, memory_bytes, row_count from system.chunks;
-- Results After Sorting
+---------------+------------+-------------------+--------------+-----------+
| partition_key | table_name | storage | memory_bytes | row_count |
+---------------+------------+-------------------+--------------+-----------+
| 1970-01-01T00 | h2o | ReadBuffer | 4456 | 3 |
| 1970-01-01T00 | o2 | OpenMutableBuffer | 1867 | 2 |
+---------------+------------+-------------------+--------------+-----------+
-- SQL: SELECT * from system.columns;
-- Results After Sorting
+---------------+------------+-------------+-------------+---------------+
| partition_key | table_name | column_name | column_type | influxdb_type |
+---------------+------------+-------------+-------------+---------------+
| 1970-01-01T00 | h2o | city | String | Tag |
| 1970-01-01T00 | h2o | other_temp | F64 | Field |
| 1970-01-01T00 | h2o | state | String | Tag |
| 1970-01-01T00 | h2o | temp | F64 | Field |
| 1970-01-01T00 | h2o | time | I64 | Timestamp |
| 1970-01-01T00 | o2 | city | String | Tag |
| 1970-01-01T00 | o2 | reading | F64 | Field |
| 1970-01-01T00 | o2 | state | String | Tag |
| 1970-01-01T00 | o2 | temp | F64 | Field |
| 1970-01-01T00 | o2 | time | I64 | Timestamp |
+---------------+------------+-------------+-------------+---------------+

View File

@ -1,19 +0,0 @@
-- IOX_SETUP: OldTwoMeasurementsManyFieldsOneRubChunk
--
-- system tables reflect the state of chunks, so don't run them
-- with different chunk configurations.
--
-- validate we have access to information schema for listing system tables
-- IOX_COMPARE: sorted
SELECT * from information_schema.tables where table_schema = 'system';
-- ensures the tables / plumbing are hooked up (so no need to
-- test timestamps, etc)
-- IOX_COMPARE: sorted
SELECT partition_key, table_name, storage, memory_bytes, row_count from system.chunks;
-- IOX_COMPARE: sorted
SELECT * from system.columns;

View File

@ -1,21 +0,0 @@
-- Test Setup: OldTwoMeasurementsManyFieldsTwoChunks
-- SQL: SELECT partition_key, table_name, column_name, storage, row_count, null_count, min_value, max_value, memory_bytes from system.chunk_columns;
-- Results After Sorting
+---------------+------------+-------------+-------------------+-----------+------------+-----------+-----------+--------------+
| partition_key | table_name | column_name | storage | row_count | null_count | min_value | max_value | memory_bytes |
+---------------+------------+-------------+-------------------+-----------+------------+-----------+-----------+--------------+
| 1970-01-01T00 | h2o | city | OpenMutableBuffer | 1 | 0 | Boston | Boston | 317 |
| 1970-01-01T00 | h2o | city | ReadBuffer | 2 | 0 | Boston | Boston | 327 |
| 1970-01-01T00 | h2o | other_temp | OpenMutableBuffer | 1 | 0 | 72.4 | 72.4 | 305 |
| 1970-01-01T00 | h2o | other_temp | ReadBuffer | 2 | 1 | 70.4 | 70.4 | 471 |
| 1970-01-01T00 | h2o | state | OpenMutableBuffer | 1 | 0 | CA | CA | 317 |
| 1970-01-01T00 | h2o | state | ReadBuffer | 2 | 0 | MA | MA | 315 |
| 1970-01-01T00 | h2o | temp | ReadBuffer | 2 | 1 | 70.4 | 70.4 | 471 |
| 1970-01-01T00 | h2o | time | OpenMutableBuffer | 1 | 0 | 350 | 350 | 305 |
| 1970-01-01T00 | h2o | time | ReadBuffer | 2 | 0 | 50 | 250 | 110 |
| 1970-01-01T00 | o2 | city | OpenMutableBuffer | 2 | 1 | Boston | Boston | 317 |
| 1970-01-01T00 | o2 | reading | OpenMutableBuffer | 2 | 1 | 51 | 51 | 305 |
| 1970-01-01T00 | o2 | state | OpenMutableBuffer | 2 | 0 | CA | MA | 321 |
| 1970-01-01T00 | o2 | temp | OpenMutableBuffer | 2 | 0 | 53.4 | 79 | 305 |
| 1970-01-01T00 | o2 | time | OpenMutableBuffer | 2 | 0 | 50 | 300 | 305 |
+---------------+------------+-------------+-------------------+-----------+------------+-----------+-----------+--------------+

View File

@ -1,12 +0,0 @@
-- IOX_SETUP: OldTwoMeasurementsManyFieldsTwoChunks
--
-- system tables reflect the state of chunks, so don't run them
-- with different chunk configurations.
--
-- ensures the tables / plumbing are hooked up (so no need to
-- test timestamps, etc)
-- IOX_COMPARE: sorted
SELECT partition_key, table_name, column_name, storage, row_count, null_count, min_value, max_value, memory_bytes from system.chunk_columns;

View File

@ -1,31 +0,0 @@
-- Test Setup: OldTwoMeasurementsManyFieldsOneRubChunk
-- SQL: EXPLAIN SELECT count(*) from h2o;
+---------------+-------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------+
| logical_plan | Projection: #COUNT(UInt8(1)) |
| | Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] |
| | TableScan: h2o projection=Some([0]) |
| physical_plan | ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))] |
| | ProjectionExec: expr=[3 as COUNT(UInt8(1))] |
| | EmptyExec: produce_one_row=true |
| | |
+---------------+-------------------------------------------------------------+
-- SQL: EXPLAIN SELECT count(*) from h2o where temp > 70.0 and temp < 72.0;
+---------------+---------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: #COUNT(UInt8(1)) |
| | Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] |
| | Filter: #h2o.temp > Float64(70) AND #h2o.temp < Float64(72) |
| | TableScan: h2o projection=Some([3]), partial_filters=[#h2o.temp > Float64(70), #h2o.temp < Float64(72)] |
| physical_plan | ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))] |
| | HashAggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))] |
| | CoalescePartitionsExec |
| | HashAggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))] |
| | CoalesceBatchesExec: target_batch_size=500 |
| | FilterExec: temp@0 > 70 AND temp@0 < 72 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=h2o, chunks=1 predicate=Predicate exprs: [#temp > Float64(70), #temp < Float64(72)] |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------+

View File

@ -1,9 +0,0 @@
-- Demonstrate plans that are optimized using statistics
-- IOX_SETUP: OldTwoMeasurementsManyFieldsOneRubChunk
-- This plan should not scan data as it reads from a RUB chunk (no duplicate) and no soft deleted data
EXPLAIN SELECT count(*) from h2o;
-- However, this plan will still need to scan data given the predicate
EXPLAIN SELECT count(*) from h2o where temp > 70.0 and temp < 72.0;

View File

@ -32,20 +32,6 @@ async fn test_cases_basic_sql() {
.expect("flush worked");
}
#[tokio::test]
// Tests from "chunk_order.sql",
async fn test_cases_chunk_order_sql() {
let input_path = Path::new("cases").join("in").join("chunk_order.sql");
let mut runner = Runner::new();
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
// Tests from "delete_all.sql",
async fn test_cases_delete_all_sql() {
@ -131,65 +117,9 @@ async fn test_cases_new_sql_system_tables_sql() {
}
#[tokio::test]
// Tests from "no_stats_plans.sql",
async fn test_cases_no_stats_plans_sql() {
let input_path = Path::new("cases").join("in").join("no_stats_plans.sql");
let mut runner = Runner::new();
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
// Tests from "old_duplicates.sql",
async fn test_cases_old_duplicates_sql() {
let input_path = Path::new("cases").join("in").join("old_duplicates.sql");
let mut runner = Runner::new();
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
// Tests from "old_sql_system_tables.sql",
async fn test_cases_old_sql_system_tables_sql() {
let input_path = Path::new("cases").join("in").join("old_sql_system_tables.sql");
let mut runner = Runner::new();
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
// Tests from "old_sql_system_tables2.sql",
async fn test_cases_old_sql_system_tables2_sql() {
let input_path = Path::new("cases").join("in").join("old_sql_system_tables2.sql");
let mut runner = Runner::new();
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
// Tests from "old_stats_plans.sql",
async fn test_cases_old_stats_plans_sql() {
let input_path = Path::new("cases").join("in").join("old_stats_plans.sql");
// Tests from "duplicates.sql",
async fn test_cases_duplicates_sql() {
let input_path = Path::new("cases").join("in").join("duplicates.sql");
let mut runner = Runner::new();
runner
.run(input_path)

View File

@ -52,12 +52,8 @@ pub fn get_all_setups() -> &'static HashMap<String, Arc<dyn DbSetup>> {
register_setup!(TwoMeasurementsManyFields),
register_setup!(TwoMeasurementsPredicatePushDown),
register_setup!(TwoMeasurementsManyFieldsOneChunk),
register_setup!(OldTwoMeasurementsManyFieldsTwoChunks),
register_setup!(OldTwoMeasurementsManyFieldsOneRubChunk),
register_setup!(OneMeasurementFourChunksWithDuplicates),
register_setup!(OldOneMeasurementFourChunksWithDuplicates),
register_setup!(OneMeasurementAllChunksDropped),
register_setup!(ChunkOrder),
register_setup!(ThreeDeleteThreeChunks),
register_setup!(OneDeleteSimpleExprOneChunkDeleteAll),
register_setup!(OneDeleteSimpleExprOneChunk),

View File

@ -7,11 +7,7 @@ use async_trait::async_trait;
use super::util::{make_n_chunks_scenario_new, ChunkDataNew, DeleteTimeNew, PredNew};
use super::{DbScenario, DbSetup};
use crate::scenarios::util::{
all_scenarios_for_one_chunk, make_different_stage_chunks_with_deletes_scenario_old,
make_os_chunks_and_then_compact_with_different_scenarios_with_delete, ChunkDataOld,
ChunkStageOld,
};
use crate::scenarios::util::all_scenarios_for_one_chunk;
// =========================================================================================================================
// DELETE TEST SETUPS: chunk lp data, how many chunks, their types, how many delete predicates and when they happen
@ -255,225 +251,49 @@ impl DbSetup for ThreeDeleteThreeChunks {
)],
};
// ----------------------
// 3 chunks: MUB, RUB, OS
let lp = vec![
ChunkDataOld {
lp_lines: lp_lines_1.clone(),
chunk_stage: ChunkStageOld::Os,
//let preds = vec![&pred1, &pred2, &pred3];
let preds = vec![
PredNew {
predicate: &pred1,
delete_time: DeleteTimeNew::End,
},
ChunkDataOld {
lp_lines: lp_lines_2.clone(),
chunk_stage: ChunkStageOld::Rub,
PredNew {
predicate: &pred2,
delete_time: DeleteTimeNew::End,
},
ChunkDataOld {
lp_lines: lp_lines_3.clone(),
chunk_stage: ChunkStageOld::Mubo,
PredNew {
predicate: &pred3,
delete_time: DeleteTimeNew::End,
},
];
let preds = vec![&pred1, &pred2, &pred3];
let scenario_mub_rub_os = make_different_stage_chunks_with_deletes_scenario_old(
lp,
preds.clone(),
table_name,
partition_key,
)
.await;
// ----------------------
// 3 chunks: 1 MUB open, 1 MUB frozen, 1 RUB
let lp = vec![
ChunkDataOld {
lp_lines: lp_lines_1.clone(),
chunk_stage: ChunkStageOld::Rub,
},
ChunkDataOld {
lp_lines: lp_lines_2.clone(),
chunk_stage: ChunkStageOld::Mubf,
},
ChunkDataOld {
lp_lines: lp_lines_3.clone(),
chunk_stage: ChunkStageOld::Mubo,
},
];
let scenario_2mub_rub = make_different_stage_chunks_with_deletes_scenario_old(
lp,
preds.clone(),
table_name,
partition_key,
)
.await;
// ----------------------
// 3 chunks: 2 MUB, 1 OS
let lp = vec![
ChunkDataOld {
lp_lines: lp_lines_1.clone(),
chunk_stage: ChunkStageOld::Os,
},
ChunkDataOld {
lp_lines: lp_lines_2.clone(),
chunk_stage: ChunkStageOld::Mubf,
},
ChunkDataOld {
lp_lines: lp_lines_3.clone(),
chunk_stage: ChunkStageOld::Mubo,
},
];
let scenario_2mub_os = make_different_stage_chunks_with_deletes_scenario_old(
lp,
preds.clone(),
table_name,
partition_key,
)
.await;
// ----------------------
// 3 chunks: 2 RUB, 1 OS
let lp = vec![
ChunkDataOld {
lp_lines: lp_lines_1.clone(),
chunk_stage: ChunkStageOld::Os,
},
ChunkDataOld {
lp_lines: lp_lines_2.clone(),
chunk_stage: ChunkStageOld::Rub,
},
ChunkDataOld {
lp_lines: lp_lines_3.clone(),
chunk_stage: ChunkStageOld::Rub,
},
];
let scenario_2rub_os = make_different_stage_chunks_with_deletes_scenario_old(
lp,
preds.clone(),
table_name,
partition_key,
)
.await;
// ----------------------
// 3 chunks: RUB, 2 OS
let lp = vec![
ChunkDataOld {
lp_lines: lp_lines_1.clone(),
chunk_stage: ChunkStageOld::Os,
},
ChunkDataOld {
lp_lines: lp_lines_2.clone(),
chunk_stage: ChunkStageOld::Os,
},
ChunkDataOld {
lp_lines: lp_lines_3.clone(),
chunk_stage: ChunkStageOld::Rub,
},
];
let scenario_rub_2os = make_different_stage_chunks_with_deletes_scenario_old(
lp,
preds.clone(),
table_name,
partition_key,
)
.await;
// ----------------------
// 3 chunks: 3 OS
let lp = vec![
ChunkDataOld {
lp_lines: lp_lines_1.clone(),
chunk_stage: ChunkStageOld::Os,
},
ChunkDataOld {
lp_lines: lp_lines_2.clone(),
chunk_stage: ChunkStageOld::Os,
},
ChunkDataOld {
lp_lines: lp_lines_3.clone(),
chunk_stage: ChunkStageOld::Os,
},
];
let scenario_3os = make_different_stage_chunks_with_deletes_scenario_old(
lp,
preds.clone(),
table_name,
partition_key,
)
.await;
// ----------------------
// A few more scenarios to compact all 3 OS chunk or the fist 2 OS chunks
// with delete before or after the os_compaction
let compact_os_scenarios =
make_os_chunks_and_then_compact_with_different_scenarios_with_delete(
vec![lp_lines_1.clone(), lp_lines_2.clone(), lp_lines_3.clone()],
preds.clone(),
table_name,
// Scenarios
// All threee deletes will be applied to every chunk but due to their predicates,
// only appropriate data is deleted
let scenarios = make_n_chunks_scenario_new(&[
ChunkDataNew {
lp_lines: lp_lines_1,
preds: preds.clone(),
delete_table_name: Some(table_name),
partition_key,
)
.await;
// return scenarios to run queries
let mut scenarios = vec![
scenario_mub_rub_os,
scenario_2mub_rub,
scenario_2mub_os,
scenario_2rub_os,
scenario_rub_2os,
scenario_3os,
];
scenarios.extend(compact_os_scenarios.into_iter());
scenarios.append(
&mut make_n_chunks_scenario_new(&[
ChunkDataNew {
lp_lines: lp_lines_1,
preds: vec![
PredNew {
predicate: &pred1,
delete_time: DeleteTimeNew::End,
},
PredNew {
predicate: &pred2,
delete_time: DeleteTimeNew::End,
},
PredNew {
predicate: &pred3,
delete_time: DeleteTimeNew::End,
},
],
delete_table_name: Some(table_name),
partition_key,
..Default::default()
},
ChunkDataNew {
lp_lines: lp_lines_2,
preds: vec![
PredNew {
predicate: &pred2,
delete_time: DeleteTimeNew::End,
},
PredNew {
predicate: &pred3,
delete_time: DeleteTimeNew::End,
},
],
delete_table_name: Some(table_name),
partition_key,
..Default::default()
},
ChunkDataNew {
lp_lines: lp_lines_3,
preds: vec![PredNew {
predicate: &pred3,
delete_time: DeleteTimeNew::End,
}],
delete_table_name: Some(table_name),
partition_key,
..Default::default()
},
])
.await,
);
..Default::default()
},
ChunkDataNew {
lp_lines: lp_lines_2,
preds: preds.clone(),
delete_table_name: Some(table_name),
partition_key,
..Default::default()
},
ChunkDataNew {
lp_lines: lp_lines_3,
preds,
delete_table_name: Some(table_name),
partition_key,
..Default::default()
},
])
.await;
scenarios
}

View File

@ -9,100 +9,16 @@ use db::{
utils::{
count_mutable_buffer_chunks, count_object_store_chunks, count_read_buffer_chunks, make_db,
},
LockableChunk, LockablePartition,
};
use query::{frontend::sql::SqlQueryPlanner, QueryChunk};
use crate::scenarios::util::{make_n_chunks_scenario_new, ChunkDataNew};
use super::{
util::{
all_scenarios_for_one_chunk, make_one_rub_or_parquet_chunk_scenario,
make_two_chunk_scenarios, ChunkStageNew,
},
util::{all_scenarios_for_one_chunk, make_two_chunk_scenarios, ChunkStageNew},
DbScenario, DbSetup,
};
/// No data
#[derive(Debug)]
pub struct ChunkOrder {}
#[async_trait]
impl DbSetup for ChunkOrder {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let table_name = "cpu";
let db = make_db().await.db;
// create first chunk: data->MUB->RUB
write_lp(&db, "cpu,region=west user=1 100");
assert_eq!(count_mutable_buffer_chunks(&db), 1);
assert_eq!(count_read_buffer_chunks(&db), 0);
assert_eq!(count_object_store_chunks(&db), 0);
db.compact_partition(table_name, partition_key)
.await
.unwrap();
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 0);
// We prepare a persist, then drop the locks, perform another write, re-acquire locks
// and start a persist operation. In practice the lifecycle doesn't drop the locks
// before starting the persist operation, but this allows us to deterministically
// interleave a persist with a write
let partition = db.lockable_partition(table_name, partition_key).unwrap();
let (chunks, flush_handle) = {
let partition = partition.read();
let chunks = LockablePartition::chunks(&partition);
let mut partition = partition.upgrade();
let flush_handle = LockablePartition::prepare_persist(&mut partition, true).unwrap();
(chunks, flush_handle)
};
// create second chunk: data->MUB
write_lp(&db, "cpu,region=west user=2 100");
assert_eq!(count_mutable_buffer_chunks(&db), 1);
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 0);
let tracker = {
let partition = partition.write();
let chunks = chunks.iter().map(|chunk| chunk.write()).collect();
LockablePartition::persist_chunks(partition, chunks, flush_handle).unwrap()
};
tracker.join().await;
assert!(tracker.get_status().result().unwrap().success());
assert_eq!(count_mutable_buffer_chunks(&db), 1);
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 1);
// Now we have the the following chunks (same partition and table):
//
// | ID | order | tag: region | field: user | time |
// | -- | ----- | ----------- | ----------- | ---- |
// | 1 | 1 | "west" | 2 | 100 |
// | 2 | 0 | "west" | 1 | 100 |
//
// The result after deduplication should be:
//
// | tag: region | field: user | time |
// | ----------- | ----------- | ---- |
// | "west" | 2 | 100 |
//
// So the query engine must use `order` as a primary key to sort chunks, NOT `id`.
let scenario = DbScenario {
scenario_name: "chunks where chunk ID alone cannot be used for ordering".into(),
db,
};
vec![scenario]
}
}
#[derive(Debug)]
pub struct MeasurementWithMaxTime {}
#[async_trait]
@ -121,6 +37,8 @@ impl DbSetup for MeasurementWithMaxTime {
}
}
// TODO: rewrite this for NG. Need to see whether we will need to test empty DB and if so how
// <https://github.com/influxdata/influxdb_iox/issues/4488>
/// No data
#[derive(Debug)]
pub struct NoData {}
@ -496,7 +414,7 @@ impl DbSetup for TwoMeasurementsPredicatePushDown {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let lp_lines = vec![
let lp_lines1 = vec![
"restaurant,town=andover count=40000u,system=5.0 100",
"restaurant,town=reading count=632u,system=5.0 120",
"restaurant,town=bedford count=189u,system=7.0 110",
@ -504,11 +422,13 @@ impl DbSetup for TwoMeasurementsPredicatePushDown {
"restaurant,town=lexington count=372u,system=5.0 100",
"restaurant,town=lawrence count=872u,system=6.0 110",
"restaurant,town=reading count=632u,system=6.0 130",
];
let lp_lines2 = vec![
"school,town=reading count=17u,system=6.0 150",
"school,town=andover count=25u,system=6.0 160",
];
make_one_rub_or_parquet_chunk_scenario(partition_key, &lp_lines.join("\n")).await
make_two_chunk_scenarios(partition_key, &lp_lines1.join("\n"), &lp_lines2.join("\n")).await
}
}
@ -598,75 +518,6 @@ impl DbSetup for TwoMeasurementsManyFieldsOneChunk {
}
}
#[derive(Debug)]
/// This has a single chunk for queries that check the state of the system
///
/// This scenario is OG-specific and can be used for `EXPLAIN` plans and system tables.
pub struct OldTwoMeasurementsManyFieldsOneRubChunk {}
#[async_trait]
impl DbSetup for OldTwoMeasurementsManyFieldsOneRubChunk {
async fn make(&self) -> Vec<DbScenario> {
let db = make_db().await.db;
let partition_key = "1970-01-01T00";
let lp_lines = vec![
"h2o,state=MA,city=Boston temp=70.4 50",
"h2o,state=MA,city=Boston other_temp=70.4 250",
"h2o,state=CA,city=Boston other_temp=72.4 350",
"o2,state=MA,city=Boston temp=53.4,reading=51 50",
"o2,state=CA temp=79.0 300",
];
write_lp(&db, &lp_lines.join("\n"));
// move all data to RUB
db.compact_open_chunk("h2o", partition_key).await.unwrap();
vec![DbScenario {
scenario_name: "Data in single chunk of read buffer".into(),
db,
}]
}
}
#[derive(Debug)]
/// This has two chunks for queries that check the state of the system
///
/// This scenario is OG-specific and can be used for `EXPLAIN` plans and system tables.
pub struct OldTwoMeasurementsManyFieldsTwoChunks {}
#[async_trait]
impl DbSetup for OldTwoMeasurementsManyFieldsTwoChunks {
async fn make(&self) -> Vec<DbScenario> {
let db = make_db().await.db;
let partition_key = "1970-01-01T00";
let lp_lines = vec![
"h2o,state=MA,city=Boston temp=70.4 50",
"h2o,state=MA,city=Boston other_temp=70.4 250",
];
write_lp(&db, &lp_lines.join("\n"));
db.compact_partition("h2o", partition_key).await.unwrap();
let lp_lines = vec![
"h2o,state=CA,city=Boston other_temp=72.4 350",
"o2,state=MA,city=Boston temp=53.4,reading=51 50",
"o2,state=CA temp=79.0 300",
];
write_lp(&db, &lp_lines.join("\n"));
assert_eq!(count_mutable_buffer_chunks(&db), 2);
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 0);
vec![DbScenario {
scenario_name: "Data in two open mutable buffer chunks per table and read buffer"
.into(),
db,
}]
}
}
#[derive(Debug)]
/// This has two chunks for queries that check the state of the system
///
@ -755,8 +606,6 @@ pub struct OneMeasurementFourChunksWithDuplicates {}
#[async_trait]
impl DbSetup for OneMeasurementFourChunksWithDuplicates {
async fn make(&self) -> Vec<DbScenario> {
let db = make_db().await.db;
let partition_key = "1970-01-01T00";
// Chunk 1:
@ -768,8 +617,6 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates {
"h2o,state=MA,city=Boston max_temp=75.4 250",
"h2o,state=MA,city=Andover max_temp=69.2, 250",
];
write_lp(&db, &lp_lines1.join("\n"));
db.compact_open_chunk("h2o", partition_key).await.unwrap();
// Chunk 2: overlaps with chunk 1
// . time range: 150 - 300
@ -782,8 +629,6 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates {
"h2o,state=CA,city=SJ min_temp=78.5,max_temp=88.0 300",
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 350",
];
write_lp(&db, &lp_lines2.join("\n"));
db.compact_open_chunk("h2o", partition_key).await.unwrap();
// Chunk 3: no overlap
// . time range: 400 - 500
@ -796,8 +641,6 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates {
"h2o,state=CA,city=SJ min_temp=77.0,max_temp=90.7 450",
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=88.2 500",
];
write_lp(&db, &lp_lines3.join("\n"));
db.compact_open_chunk("h2o", partition_key).await.unwrap();
// Chunk 4: no overlap
// . time range: 600 - 700
@ -810,109 +653,35 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates {
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=89.2 650",
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 700",
];
write_lp(&db, &lp_lines4.join("\n"));
db.compact_open_chunk("h2o", partition_key).await.unwrap();
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 4);
assert_eq!(count_object_store_chunks(&db), 0);
let scenarios = make_n_chunks_scenario_new(&[
ChunkDataNew {
lp_lines: lp_lines1,
partition_key,
..Default::default()
},
ChunkDataNew {
lp_lines: lp_lines2,
partition_key,
..Default::default()
},
ChunkDataNew {
lp_lines: lp_lines3,
partition_key,
..Default::default()
},
ChunkDataNew {
lp_lines: lp_lines4,
partition_key,
..Default::default()
},
])
.await;
let mut scenarios = vec![DbScenario {
scenario_name: "Data in four chunks with duplicates".into(),
db,
}];
scenarios.append(
&mut make_n_chunks_scenario_new(&[
ChunkDataNew {
lp_lines: lp_lines1,
partition_key,
..Default::default()
},
ChunkDataNew {
lp_lines: lp_lines2,
partition_key,
..Default::default()
},
ChunkDataNew {
lp_lines: lp_lines3,
partition_key,
..Default::default()
},
ChunkDataNew {
lp_lines: lp_lines4,
partition_key,
..Default::default()
},
])
.await,
);
scenarios
}
}
#[derive(Debug)]
/// Setup for four chunks with duplicates for deduplicate tests
///
/// This scenario is OG-specific and can be used for `EXPLAIN` plans and system tables.
pub struct OldOneMeasurementFourChunksWithDuplicates {}
#[async_trait]
impl DbSetup for OldOneMeasurementFourChunksWithDuplicates {
async fn make(&self) -> Vec<DbScenario> {
let scenarios: Vec<_> = OneMeasurementFourChunksWithDuplicates {}
.make()
.await
.into_iter()
.filter(|s| s.scenario_name == "Data in four chunks with duplicates")
.collect();
assert_eq!(scenarios.len(), 1);
scenarios
}
}
#[derive(Debug)]
/// This has a single scenario with all the life cycle operations to
/// test queries that depend on that
///
/// This scenario is OG-specific and can be used for `EXPLAIN` plans and system tables.
pub struct OldTwoMeasurementsManyFieldsLifecycle {}
#[async_trait]
impl DbSetup for OldTwoMeasurementsManyFieldsLifecycle {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let db = make_db().await.db;
write_lp(
&db,
&vec![
"h2o,state=MA,city=Boston temp=70.4 50",
"h2o,state=MA,city=Boston other_temp=70.4 250",
]
.join("\n"),
);
db.compact_open_chunk("h2o", partition_key).await.unwrap();
db.persist_partition("h2o", partition_key, true)
.await
.unwrap();
write_lp(
&db,
&vec!["h2o,state=CA,city=Boston other_temp=72.4 350"].join("\n"),
);
assert_eq!(count_mutable_buffer_chunks(&db), 1);
assert_eq!(count_read_buffer_chunks(&db), 1);
assert_eq!(count_object_store_chunks(&db), 1);
vec![DbScenario {
scenario_name: "Data in parquet, RUB, and MUB".into(),
db,
}]
}
}
#[derive(Debug)]
pub struct OneMeasurementManyFields {}
#[async_trait]

View File

@ -3,17 +3,11 @@ use super::DbScenario;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use backoff::BackoffConfig;
use data_types::{chunk_metadata::ChunkId, delete_predicate::DeletePredicate};
use data_types::delete_predicate::DeletePredicate;
use data_types2::{
IngesterQueryRequest, NonEmptyString, PartitionId, Sequence, SequenceNumber, SequencerId,
TombstoneId,
};
use db::test_helpers::chunk_ids_rub;
use db::{
test_helpers::write_lp,
utils::{count_mub_table_chunks, count_os_table_chunks, count_rub_table_chunks, make_db},
Db,
};
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
use futures::StreamExt;
use generated_types::influxdata::iox::ingester::v1::{
@ -33,7 +27,6 @@ use querier::{
IngesterConnectionImpl, IngesterFlightClient, IngesterFlightClientError,
IngesterFlightClientQueryData, QuerierCatalogCache, QuerierNamespace,
};
use query::QueryChunk;
use schema::selection::Selection;
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
@ -45,14 +38,6 @@ use std::{fmt::Display, sync::Arc};
// & when delete predicates are applied
// STRUCTs & ENUMs
#[derive(Debug, Clone)]
pub struct ChunkDataOld<'a> {
/// Line protocol data of this chunk
pub lp_lines: Vec<&'a str>,
/// which stage this chunk will be created
pub chunk_stage: ChunkStageOld,
}
#[derive(Debug, Clone, Default)]
pub struct ChunkDataNew<'a, 'b> {
/// Line protocol data of this chunk
@ -103,39 +88,6 @@ impl<'a, 'b> ChunkDataNew<'a, 'b> {
}
}
#[derive(Debug, Clone, Copy)]
pub enum ChunkStageOld {
/// Open MUB
Mubo,
/// Frozen MUB
Mubf,
/// RUB without OS
Rub,
/// both RUB and OS of the chunk exist
RubOs,
/// OS only
Os,
}
impl Display for ChunkStageOld {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Mubo => write!(f, "Open MUB"),
Self::Mubf => write!(f, "Frozen MUB"),
Self::Rub => write!(f, "RUB"),
Self::RubOs => write!(f, "RUB & OS"),
Self::Os => write!(f, "OS"),
}
}
}
impl ChunkStageOld {
/// return the list of all chunk types
pub fn all() -> Vec<Self> {
vec![Self::Mubo, Self::Mubf, Self::Rub, Self::RubOs, Self::Os]
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChunkStageNew {
/// In parquet file.
@ -179,14 +131,12 @@ impl ChunkStageNew {
#[derive(Debug, Clone, Copy)]
pub enum ChunkStage {
Old(ChunkStageOld),
New(ChunkStageNew),
}
impl Display for ChunkStage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Old(stage) => write!(f, "Old: {}", stage),
Self::New(stage) => write!(f, "New: {}", stage),
}
}
@ -195,22 +145,13 @@ impl Display for ChunkStage {
impl ChunkStage {
/// return the list of all chunk types
pub fn all() -> Vec<Self> {
ChunkStageOld::all()
ChunkStageNew::all()
.into_iter()
.map(ChunkStage::Old)
.chain(ChunkStageNew::all().into_iter().map(ChunkStage::New))
.map(ChunkStage::New)
.collect()
}
}
#[derive(Debug, Clone)]
pub struct PredOld<'a> {
/// Delete predicate
predicate: &'a DeletePredicate,
/// At which chunk stage this predicate is applied
delete_time: DeleteTimeOld,
}
#[derive(Debug, Clone)]
pub struct PredNew<'a> {
/// Delete predicate
@ -232,17 +173,12 @@ impl<'a> PredNew<'a> {
#[derive(Debug, Clone)]
pub enum Pred<'a> {
Old(PredOld<'a>),
New(PredNew<'a>),
}
impl<'a> Pred<'a> {
fn new(predicate: &'a DeletePredicate, delete_time: DeleteTime) -> Self {
match delete_time {
DeleteTime::Old(delete_time) => Self::Old(PredOld {
predicate,
delete_time,
}),
DeleteTime::New(delete_time) => Self::New(PredNew {
predicate,
delete_time,
@ -251,46 +187,6 @@ impl<'a> Pred<'a> {
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DeleteTimeOld {
/// Delete predicate happens after all chunks created
/// and moved to their corresponding stages
End,
/// Delete predicate is added to chunks at their Mub Open stage
Mubo,
/// Delete predicate is added to chunks at their Mub Frozen stage
Mubf,
/// Delete predicate is added to chunks at their Rub stage
Rub,
/// Delete predicate is added to chunks at their Rub & Os stage
RubOs,
/// Delete predicate is added to chunks at their Os stage
Os,
}
impl DeleteTimeOld {
/// Return all DeleteTime at and after the given chunk stage
pub fn all_from_and_before(chunk_stage: ChunkStageOld) -> Vec<Self> {
match chunk_stage {
ChunkStageOld::Mubo => vec![Self::Mubo],
ChunkStageOld::Mubf => vec![Self::Mubo, Self::Mubf],
ChunkStageOld::Rub => {
vec![Self::Mubo, Self::Mubf, Self::Rub]
}
ChunkStageOld::RubOs => vec![Self::Mubo, Self::Mubf, Self::Rub, Self::RubOs],
ChunkStageOld::Os => vec![Self::Mubo, Self::Mubf, Self::Rub, Self::RubOs, Self::Os],
}
}
pub fn begin() -> Self {
Self::Mubo
}
pub fn end() -> Self {
Self::End
}
}
/// Describes when a delete predicate was applied.
///
/// # Ordering
@ -392,7 +288,6 @@ impl Display for DeleteTimeNew {
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DeleteTime {
Old(DeleteTimeOld),
New(DeleteTimeNew),
}
@ -400,10 +295,6 @@ impl DeleteTime {
/// Return all DeleteTime at and after the given chunk stage
fn all_from_and_before(chunk_stage: ChunkStage) -> Vec<Self> {
match chunk_stage {
ChunkStage::Old(chunk_stage) => DeleteTimeOld::all_from_and_before(chunk_stage)
.into_iter()
.map(Self::Old)
.collect(),
ChunkStage::New(chunk_stage) => DeleteTimeNew::all_from_and_before(chunk_stage)
.into_iter()
.map(Self::New)
@ -413,14 +304,12 @@ impl DeleteTime {
fn begin_for(chunk_stage: ChunkStage) -> Self {
match chunk_stage {
ChunkStage::Old(_) => Self::Old(DeleteTimeOld::begin()),
ChunkStage::New(chunk_stage) => Self::New(DeleteTimeNew::begin_for(chunk_stage)),
}
}
fn end_for(chunk_stage: ChunkStage) -> Self {
match chunk_stage {
ChunkStage::Old(_) => Self::Old(DeleteTimeOld::end()),
ChunkStage::New(chunk_stage) => Self::New(DeleteTimeNew::end_for(chunk_stage)),
}
}
@ -498,29 +387,10 @@ async fn make_chunk_with_deletes_at_different_stages(
partition_key: &str,
) -> DbScenario {
match chunk_stage {
ChunkStage::Old(chunk_stage) => {
let preds: Vec<_> = preds
.into_iter()
.map(|p| match p {
Pred::Old(pred) => pred,
Pred::New(_) => panic!("mixed new and old"),
})
.collect();
make_chunk_with_deletes_at_different_stages_old(
lp_lines,
chunk_stage,
preds,
delete_table_name,
partition_key,
)
.await
}
ChunkStage::New(chunk_stage) => {
let preds: Vec<_> = preds
.into_iter()
.map(|p| match p {
Pred::Old(_) => panic!("mixed new and old"),
Pred::New(pred) => pred,
})
.collect();
@ -537,242 +407,6 @@ async fn make_chunk_with_deletes_at_different_stages(
}
}
async fn make_chunk_with_deletes_at_different_stages_old(
lp_lines: Vec<&str>,
chunk_stage: ChunkStageOld,
preds: Vec<PredOld<'_>>,
delete_table_name: &str,
partition_key: &str,
) -> DbScenario {
let db = make_db().await.db;
// ----------------------
// Make an open MUB
//
// There may be more than one tables in the lp data
let tables = write_lp(&db, &lp_lines.join("\n"));
for table in &tables {
let num_mubs = count_mub_table_chunks(&db, table.as_str(), partition_key);
// must be one MUB per table
assert_eq!(num_mubs, 1);
}
// Apply delete predicate
let mut deleted = false;
let mut display = "".to_string();
let mut count = 0;
for pred in &preds {
if pred.delete_time == DeleteTimeOld::Mubo {
db.delete(delete_table_name, Arc::new(pred.predicate.clone()))
.unwrap();
deleted = true;
count += 1;
}
}
if count > 0 {
display.push_str(format!(", with {} deletes from open MUB", count).as_str());
}
// ----------------------
// Freeze MUB if requested
match chunk_stage {
ChunkStageOld::Mubf | ChunkStageOld::Rub | ChunkStageOld::RubOs | ChunkStageOld::Os => {
// Since mub are frozen at delete, no need to do it in that case for table of deleted data
if !deleted {
db.rollover_partition(delete_table_name, partition_key)
.await
.unwrap()
.unwrap();
}
// Freeze MUBs of tables that not have deleted/deleting data
for table in &tables {
if table != delete_table_name {
db.rollover_partition(table.as_str(), partition_key)
.await
.unwrap()
.unwrap();
}
}
// Verify still one MUB and no RUB for each table
for table in &tables {
let num_mubs = count_mub_table_chunks(&db, table.as_str(), partition_key);
let num_rubs = count_rub_table_chunks(&db, table.as_str(), partition_key);
assert_eq!(num_mubs, 1);
assert_eq!(num_rubs, 0);
}
}
_ => {}
}
// Apply delete predicate
count = 0;
for pred in &preds {
if pred.delete_time == DeleteTimeOld::Mubf {
db.delete(delete_table_name, Arc::new(pred.predicate.clone()))
.unwrap();
count += 1;
}
}
if count > 0 {
display.push_str(format!(", with {} deletes from frozen MUB", count).as_str());
}
// ----------------------
// Move MUB to RUB if requested
match chunk_stage {
ChunkStageOld::Rub | ChunkStageOld::RubOs | ChunkStageOld::Os => {
let mut no_more_data = false;
for table in &tables {
// Compact this MUB of this table
let chunk_result = db.compact_partition(table, partition_key).await.unwrap();
// Verify no MUB and one RUB if not all data was soft deleted
let num_mubs = count_mub_table_chunks(&db, table.as_str(), partition_key);
let num_rubs = count_rub_table_chunks(&db, table.as_str(), partition_key);
assert_eq!(num_mubs, 0);
// Stop if compaction result is nothing which means MUB has
// all soft deleted data and no RUB is created. No more data
// to affect further delete
if table == delete_table_name {
match chunk_result {
Some(_chunk) => {
assert_eq!(num_rubs, 1);
}
None => {
assert_eq!(num_rubs, 0);
no_more_data = true;
}
}
} else {
assert_eq!(num_rubs, 1);
}
}
if no_more_data {
let scenario_name =
format!("Deleted data from one {} chunk{}", chunk_stage, display);
return DbScenario { scenario_name, db };
}
}
_ => {}
}
// Apply delete predicate
count = 0;
for pred in &preds {
if pred.delete_time == DeleteTimeOld::Rub {
db.delete(delete_table_name, Arc::new(pred.predicate.clone()))
.unwrap();
count += 1;
}
}
if count > 0 {
display.push_str(format!(", with {} deletes from RUB", count).as_str());
}
// ----------------------
// Persist RUB to OS if requested
match chunk_stage {
ChunkStageOld::RubOs | ChunkStageOld::Os => {
let mut no_more_data = false;
for table in &tables {
// Persist RUB of this table
let chunk_result = db
.persist_partition(table, partition_key, true)
.await
.unwrap();
// Verify no MUB and one RUB if not all data was soft deleted
let num_mubs = count_mub_table_chunks(&db, table.as_str(), partition_key);
let num_rubs = count_rub_table_chunks(&db, table.as_str(), partition_key);
let num_os = count_os_table_chunks(&db, table.as_str(), partition_key);
assert_eq!(num_mubs, 0);
// For delete table, two different things can happen
if table == delete_table_name {
match chunk_result {
// Not all rows were soft deleted
Some(_chunk) => {
assert_eq!(num_rubs, 1); // still have RUB with the persisted OS
assert_eq!(num_os, 1);
}
// All rows in the RUB were soft deleted. There is nothing after compacting and hence
// nothing will be left and persisted.
None => {
assert_eq!(num_rubs, 0);
assert_eq!(num_os, 0);
no_more_data = true;
}
}
} else {
assert_eq!(num_rubs, 1);
assert_eq!(num_os, 1);
}
}
if no_more_data {
let scenario_name =
format!("Deleted data from one {} chunk{}", chunk_stage, display);
return DbScenario { scenario_name, db };
}
}
_ => {}
}
// Apply delete predicate
count = 0;
for pred in &preds {
if pred.delete_time == DeleteTimeOld::RubOs {
db.delete(delete_table_name, Arc::new(pred.predicate.clone()))
.unwrap();
count = 1;
}
}
if count > 0 {
display.push_str(format!(", with {} deletes from RUB & OS", count).as_str());
}
// ----------------------
// Unload RUB
if let ChunkStageOld::Os = chunk_stage {
for table in &tables {
// retrieve its chunk_id first
let rub_chunk_ids = chunk_ids_rub(&db, Some(table.as_str()), Some(partition_key));
assert_eq!(rub_chunk_ids.len(), 1);
db.unload_read_buffer(table.as_str(), partition_key, rub_chunk_ids[0])
.unwrap();
// verify chunk stages
let num_mubs = count_mub_table_chunks(&db, table.as_str(), partition_key);
let num_rubs = count_rub_table_chunks(&db, table.as_str(), partition_key);
let num_os = count_os_table_chunks(&db, table.as_str(), partition_key);
assert_eq!(num_mubs, 0);
assert_eq!(num_rubs, 0);
assert_eq!(num_os, 1);
}
}
// Apply delete predicate
count = 0;
for pred in &preds {
if pred.delete_time == DeleteTimeOld::Os || pred.delete_time == DeleteTimeOld::End {
db.delete(delete_table_name, Arc::new(pred.predicate.clone()))
.unwrap();
count += 1;
}
}
if count > 0 {
display.push_str(
format!(
", with {} deletes from OS or after all chunks are created",
count
)
.as_str(),
);
}
let scenario_name = format!("Deleted data from one {} chunk{}", chunk_stage, display);
DbScenario { scenario_name, db }
}
async fn make_chunk_with_deletes_at_different_stages_new(
lp_lines: Vec<&str>,
chunk_stage: ChunkStageNew,
@ -795,205 +429,6 @@ async fn make_chunk_with_deletes_at_different_stages_new(
DbScenario { scenario_name, db }
}
/// Build many chunks which are in different stages
// Note that, after a lot of thoughts, I decided to have 2 separated functions, this one and the one above.
// The above tests delete predicates before and/or after a chunk is moved to different stages, while
// this function tests different-stage chunks in various stages when one or many deletes happen.
// Even though these 2 functions have some overlapped code, merging them in one
// function will created a much more complicated cases to handle
pub async fn make_different_stage_chunks_with_deletes_scenario_old(
data: Vec<ChunkDataOld<'_>>,
preds: Vec<&DeletePredicate>,
table_name: &str,
partition_key: &str,
) -> DbScenario {
let db = make_db().await.db;
let mut display = "".to_string();
// Build chunks
for chunk_data in &data {
display.push_str(" - ");
display.push_str(&chunk_data.chunk_stage.to_string());
// ----------
// Make an open MUB
write_lp(&db, &chunk_data.lp_lines.join("\n"));
// 0 does not represent the real chunk id. It is here just to initialize the chunk_id variable for later assignment
let mut chunk_id = db.chunk_summaries()[0].id;
// ----------
// freeze MUB
match chunk_data.chunk_stage {
ChunkStageOld::Mubf | ChunkStageOld::Rub | ChunkStageOld::RubOs | ChunkStageOld::Os => {
let chunk = db
.rollover_partition(table_name, partition_key)
.await
.unwrap()
.unwrap();
chunk_id = chunk.id();
}
_ => {}
}
// ----------
// Move MUB to RUB
match chunk_data.chunk_stage {
ChunkStageOld::Rub | ChunkStageOld::RubOs | ChunkStageOld::Os => {
let chunk = db
.compact_chunks(table_name, partition_key, |chunk| chunk.id() == chunk_id)
.await
.unwrap()
.unwrap();
chunk_id = chunk.id();
}
_ => {}
}
// ----------
// Move RUB to OS
match chunk_data.chunk_stage {
ChunkStageOld::RubOs | ChunkStageOld::Os => {
let chunk = db
.persist_partition(table_name, partition_key, true)
.await
.unwrap()
.unwrap();
chunk_id = chunk.id();
}
_ => {}
}
// ----------
// Unload RUB
if let ChunkStageOld::Os = chunk_data.chunk_stage {
db.unload_read_buffer(table_name, partition_key, chunk_id)
.unwrap();
}
}
// ----------
// Apply all delete predicates
for pred in &preds {
db.delete(table_name, Arc::new((*pred).clone())).unwrap();
}
// Scenario of the input chunks and delete predicates
let scenario_name = format!(
"Deleted data from {} chunks, {}, with {} deletes after all chunks are created",
data.len(),
display,
preds.len()
);
DbScenario { scenario_name, db }
}
pub async fn make_os_chunks_and_then_compact_with_different_scenarios_with_delete(
lp_lines_vec: Vec<Vec<&str>>,
preds: Vec<&DeletePredicate>,
table_name: &str,
partition_key: &str,
) -> Vec<DbScenario> {
// Scenario 1: apply deletes and then compact all 3 chunks
let (db, chunk_ids) =
make_contiguous_os_chunks(lp_lines_vec.clone(), table_name, partition_key).await;
for pred in &preds {
db.delete(table_name, Arc::new((*pred).clone())).unwrap();
}
db.compact_object_store_chunks(table_name, partition_key, chunk_ids)
.unwrap()
.join()
.await;
let scenario_name = "Deletes and then compact all OS chunks".to_string();
let scenario_1 = DbScenario { scenario_name, db };
// Scenario 2: compact all 3 chunks and apply deletes
let (db, chunk_ids) =
make_contiguous_os_chunks(lp_lines_vec.clone(), table_name, partition_key).await;
db.compact_object_store_chunks(table_name, partition_key, chunk_ids)
.unwrap()
.join()
.await;
for pred in &preds {
db.delete(table_name, Arc::new((*pred).clone())).unwrap();
}
let scenario_name = "Compact all OS chunks and then deletes".to_string();
let scenario_2 = DbScenario { scenario_name, db };
// Scenario 3: apply deletes then compact the first n-1 chunks
let (db, chunk_ids) =
make_contiguous_os_chunks(lp_lines_vec.clone(), table_name, partition_key).await;
for pred in &preds {
db.delete(table_name, Arc::new((*pred).clone())).unwrap();
}
let (_last_chunk_id, chunk_ids_but_last) = chunk_ids.split_last().unwrap();
db.compact_object_store_chunks(table_name, partition_key, chunk_ids_but_last.to_vec())
.unwrap()
.join()
.await;
let scenario_name = "Deletes and then compact all but last OS chunk".to_string();
let scenario_3 = DbScenario { scenario_name, db };
// Scenario 4: compact the first n-1 chunks then apply deletes
let (db, chunk_ids) =
make_contiguous_os_chunks(lp_lines_vec.clone(), table_name, partition_key).await;
let (_last_chunk_id, chunk_ids_but_last) = chunk_ids.split_last().unwrap();
db.compact_object_store_chunks(table_name, partition_key, chunk_ids_but_last.to_vec())
.unwrap()
.join()
.await;
for pred in &preds {
db.delete(table_name, Arc::new((*pred).clone())).unwrap();
}
let scenario_name = "Compact all but last OS chunk and then deletes".to_string();
let scenario_4 = DbScenario { scenario_name, db };
vec![scenario_1, scenario_2, scenario_3, scenario_4]
}
async fn make_contiguous_os_chunks(
lp_lines_vec: Vec<Vec<&str>>,
table_name: &str,
partition_key: &str,
) -> (Arc<Db>, Vec<ChunkId>) {
// This test is aimed for at least 3 chunks
assert!(lp_lines_vec.len() >= 3);
// First make all OS chunks fot the lp_lins_vec
// Define they are OS
let mut chunk_data_vec = vec![];
for lp_lines in lp_lines_vec {
let chunk_data = ChunkDataOld {
lp_lines: lp_lines.clone(),
chunk_stage: ChunkStageOld::Os,
};
chunk_data_vec.push(chunk_data);
}
// Make db with those OS chunks
let scenario = make_different_stage_chunks_with_deletes_scenario_old(
chunk_data_vec,
vec![], // not delete anything yet
table_name,
partition_key,
)
.await;
// Get chunk ids in contiguous order
let db = Arc::downcast::<Db>(scenario.db.as_any_arc()).unwrap();
let partition = db.partition(table_name, partition_key).unwrap();
let partition = partition.read();
let mut keyed_chunks: Vec<(_, _)> = partition
.keyed_chunks()
.into_iter()
.map(|(id, order, _chunk)| (id, order))
.collect();
keyed_chunks.sort_by(|(_id1, order1), (_id2, order2)| order1.cmp(order2));
let chunk_ids: Vec<_> = keyed_chunks.iter().map(|(id, _order)| *id).collect();
(db, chunk_ids)
}
/// This function loads two chunks of lp data into 4 different scenarios
///
/// Data in single open mutable buffer chunk
@ -1005,147 +440,7 @@ pub async fn make_two_chunk_scenarios(
data1: &str,
data2: &str,
) -> Vec<DbScenario> {
make_two_chunk_scenarios_old(partition_key, data1, data2)
.await
.into_iter()
.chain(make_two_chunk_scenarios_new(partition_key, data1, data2).await)
.collect()
}
async fn make_two_chunk_scenarios_old(
partition_key: &str,
data1: &str,
data2: &str,
) -> Vec<DbScenario> {
let db = make_db().await.db;
write_lp(&db, data1);
write_lp(&db, data2);
let scenario1 = DbScenario {
scenario_name: "Data in single open chunk of mutable buffer".into(),
db,
};
// spread across 2 mutable buffer chunks
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
for table_name in &table_names {
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
}
write_lp(&db, data2);
let scenario2 = DbScenario {
scenario_name: "Data in one open chunk and one closed chunk of mutable buffer".into(),
db,
};
// spread across 1 mutable buffer, 1 read buffer chunks
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
for table_name in &table_names {
db.compact_partition(table_name, partition_key)
.await
.unwrap();
}
write_lp(&db, data2);
let scenario3 = DbScenario {
scenario_name: "Data in open chunk of mutable buffer, and one chunk of read buffer".into(),
db,
};
// in 2 read buffer chunks
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
for table_name in &table_names {
db.compact_partition(table_name, partition_key)
.await
.unwrap();
}
let table_names = write_lp(&db, data2);
for table_name in &table_names {
// Compact just the last chunk
db.compact_open_chunk(table_name, partition_key)
.await
.unwrap();
}
let scenario4 = DbScenario {
scenario_name: "Data in two read buffer chunks".into(),
db,
};
// in 2 read buffer chunks that also loaded into object store
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
for table_name in &table_names {
db.persist_partition(table_name, partition_key, true)
.await
.unwrap();
}
let table_names = write_lp(&db, data2);
for table_name in &table_names {
db.persist_partition(table_name, partition_key, true)
.await
.unwrap();
}
let scenario5 = DbScenario {
scenario_name: "Data in two read buffer chunks and two parquet file chunks".into(),
db,
};
// Scenario 6: Two closed chunk in OS only
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
for table_name in &table_names {
let id = db
.persist_partition(table_name, partition_key, true)
.await
.unwrap()
.unwrap()
.id();
db.unload_read_buffer(table_name, partition_key, id)
.unwrap();
}
let table_names = write_lp(&db, data2);
for table_name in &table_names {
let id = db
.persist_partition(table_name, partition_key, true)
.await
.unwrap()
.unwrap()
.id();
db.unload_read_buffer(table_name, partition_key, id)
.unwrap();
}
let scenario6 = DbScenario {
scenario_name: "Data in 2 parquet chunks in object store only".into(),
db,
};
// Scenario 7: in a single chunk resulting from compacting MUB and RUB
let db = make_db().await.db;
let table_names = write_lp(&db, data1);
for table_name in &table_names {
// put chunk 1 into RUB
db.compact_partition(table_name, partition_key)
.await
.unwrap();
}
let table_names = write_lp(&db, data2); // write to MUB
for table_name in &table_names {
// compact chunks into a single RUB chunk
db.compact_partition(table_name, partition_key)
.await
.unwrap();
}
let scenario7 = DbScenario {
scenario_name: "Data in one compacted read buffer chunk".into(),
db,
};
vec![
scenario1, scenario2, scenario3, scenario4, scenario5, scenario6, scenario7,
]
make_two_chunk_scenarios_new(partition_key, data1, data2).await
}
async fn make_two_chunk_scenarios_new(
@ -1221,45 +516,6 @@ pub async fn make_n_chunks_scenario_new(chunks: &[ChunkDataNew<'_, '_>]) -> Vec<
scenarios
}
// // This function loads one chunk of lp data into RUB for testing predicate pushdown
pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
partition_key: &str,
data: &str,
) -> Vec<DbScenario> {
// Scenario 1: One closed chunk in RUB
let db = make_db().await.db;
let table_names = write_lp(&db, data);
for table_name in &table_names {
db.compact_partition(table_name, partition_key)
.await
.unwrap();
}
let scenario1 = DbScenario {
scenario_name: "--------------------- Data in read buffer".into(),
db,
};
// Scenario 2: One closed chunk in Parquet only
let db = make_db().await.db;
let table_names = write_lp(&db, data);
for table_name in &table_names {
let id = db
.persist_partition(table_name, partition_key, true)
.await
.unwrap()
.unwrap()
.id();
db.unload_read_buffer(table_name, partition_key, id)
.unwrap();
}
let scenario2 = DbScenario {
scenario_name: "--------------------- Data in object store only ".into(),
db,
};
vec![scenario1, scenario2]
}
/// Create given chunk using the given ingester.
///
/// Returns a human-readable chunk description.

View File

@ -123,28 +123,6 @@ async fn sql_select_from_school() {
.await;
}
#[tokio::test]
async fn sql_select_from_system_operations() {
test_helpers::maybe_start_logging();
let expected = vec![
"+----+---------+------------+---------------+----------------+------------+---------------+-------------------------------------+",
"| id | status | start_time | took_cpu_time | took_wall_time | table_name | partition_key | description |",
"+----+---------+------------+---------------+----------------+------------+---------------+-------------------------------------+",
"| 0 | Success | true | true | true | h2o | 1970-01-01T00 | Compacting chunks to ReadBuffer |",
"| 1 | Success | true | true | true | h2o | 1970-01-01T00 | Persisting chunks to object storage |",
"| 2 | Success | true | true | true | h2o | 1970-01-01T00 | Writing chunk to Object Storage |",
"+----+---------+------------+---------------+----------------+------------+---------------+-------------------------------------+",
];
// Check that the cpu time used reported is greater than zero as it isn't
// repeatable
run_sql_test_case(
OldTwoMeasurementsManyFieldsLifecycle {},
"SELECT id, status, CAST(start_time as BIGINT) > 0 as start_time, CAST(cpu_time_used AS BIGINT) > 0 as took_cpu_time, CAST(wall_time_used AS BIGINT) > 0 as took_wall_time, table_name, partition_key, description from system.operations",
&expected
).await;
}
#[tokio::test]
async fn sql_union_all() {
// validate name resolution works for UNION ALL queries