refactor: port more query tests to NG, some code clean up (#4125)

* refactor: inline function that is used once

* refactor: generalize multi-chunk creation for NG

* refactor: `TwoMeasurementsManyFieldsTwoChunks` is OG-specific

* refactor: generalize `OneMeasurementTwoChunksDifferentTagSet`

* refactor: port `OneMeasurementFourChunksWithDuplicates` to NG

* refactor: `TwoMeasurementsManyFieldsLifecycle` is OG-specific

* refactor: simplify NG chunk generation

* refactor: port `ThreeDeleteThreeChunks` to NG

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-03-24 15:07:09 +00:00 committed by GitHub
parent fb75ef7b82
commit 8ca5c337b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 185 additions and 112 deletions

1
Cargo.lock generated
View File

@ -4244,6 +4244,7 @@ dependencies = [
"datafusion 0.1.0",
"db",
"iox_tests",
"itertools",
"metric",
"mutable_batch_lp",
"object_store",

View File

@ -13,6 +13,7 @@ data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
db = { path = "../db" }
iox_tests = { path = "../iox_tests" }
itertools = "0.10"
mutable_batch_lp = { path = "../mutable_batch_lp" }
once_cell = { version = "1.10.0", features = ["parking_lot"] }
predicate = { path = "../predicate" }

View File

@ -1,4 +1,4 @@
-- Test Setup: TwoMeasurementsManyFieldsTwoChunks
-- Test Setup: OldTwoMeasurementsManyFieldsTwoChunks
-- SQL: SELECT partition_key, table_name, column_name, storage, row_count, null_count, min_value, max_value, memory_bytes from system.chunk_columns;
-- Results After Sorting
+---------------+------------+-------------+-------------------+-----------+------------+-----------+-----------+--------------+

View File

@ -1,4 +1,4 @@
-- IOX_SETUP: TwoMeasurementsManyFieldsTwoChunks
-- IOX_SETUP: OldTwoMeasurementsManyFieldsTwoChunks
--
-- system tables reflect the state of chunks, so don't run them

View File

@ -52,7 +52,7 @@ pub fn get_all_setups() -> &'static HashMap<String, Arc<dyn DbSetup>> {
register_setup!(TwoMeasurementsManyFields),
register_setup!(TwoMeasurementsPredicatePushDown),
register_setup!(TwoMeasurementsManyFieldsOneChunk),
register_setup!(TwoMeasurementsManyFieldsTwoChunks),
register_setup!(OldTwoMeasurementsManyFieldsTwoChunks),
register_setup!(OldTwoMeasurementsManyFieldsOneRubChunk),
register_setup!(OneMeasurementFourChunksWithDuplicates),
register_setup!(OneMeasurementAllChunksDropped),

View File

@ -5,6 +5,7 @@ use data_types::timestamp::TimestampRange;
use async_trait::async_trait;
use super::util::{make_n_chunks_scenario_new, ChunkDataNew, PredNew};
use super::{DbScenario, DbSetup};
use crate::scenarios::util::{
all_scenarios_for_one_chunk, make_different_stage_chunks_with_deletes_scenario_old,
@ -422,6 +423,32 @@ impl DbSetup for ThreeDeleteThreeChunks {
];
scenarios.extend(compact_os_scenarios.into_iter());
scenarios.append(
&mut make_n_chunks_scenario_new(&[
ChunkDataNew {
lp_lines: lp_lines_1,
preds: vec![PredNew::end(&pred1)],
delete_table_name: table_name,
partition_key,
..Default::default()
},
ChunkDataNew {
lp_lines: lp_lines_2,
preds: vec![PredNew::end(&pred2)],
delete_table_name: table_name,
partition_key,
..Default::default()
},
ChunkDataNew {
lp_lines: lp_lines_3,
preds: vec![PredNew::end(&pred3)],
delete_table_name: table_name,
partition_key,
..Default::default()
},
])
.await,
);
scenarios
}

View File

@ -13,10 +13,12 @@ use db::{
};
use query::QueryChunk;
use crate::scenarios::util::{make_n_chunks_scenario_new, ChunkDataNew};
use super::{
util::{
all_scenarios_for_one_chunk, make_one_rub_or_parquet_chunk_scenario,
make_two_chunk_scenarios, rollover_and_load,
make_two_chunk_scenarios,
},
DbScenario, DbSetup,
};
@ -629,9 +631,11 @@ impl DbSetup for OldTwoMeasurementsManyFieldsOneRubChunk {
#[derive(Debug)]
/// This has two chunks for queries that check the state of the system
pub struct TwoMeasurementsManyFieldsTwoChunks {}
///
/// This scenario is OG-specific and can be used for `EXPLAIN` plans and system tables.
pub struct OldTwoMeasurementsManyFieldsTwoChunks {}
#[async_trait]
impl DbSetup for TwoMeasurementsManyFieldsTwoChunks {
impl DbSetup for OldTwoMeasurementsManyFieldsTwoChunks {
async fn make(&self) -> Vec<DbScenario> {
let db = make_db().await.db;
@ -669,34 +673,21 @@ pub struct OneMeasurementTwoChunksDifferentTagSet {}
#[async_trait]
impl DbSetup for OneMeasurementTwoChunksDifferentTagSet {
async fn make(&self) -> Vec<DbScenario> {
let db = make_db().await.db;
let partition_key = "1970-01-01T00";
// tag: state
let lp_lines = vec![
let lp_lines1 = vec![
"h2o,state=MA temp=70.4 50",
"h2o,state=MA other_temp=70.4 250",
];
write_lp(&db, &lp_lines.join("\n"));
db.compact_partition("h2o", partition_key).await.unwrap();
// tag: city
let lp_lines = vec![
let lp_lines2 = vec![
"h2o,city=Boston other_temp=72.4 350",
"h2o,city=Boston temp=53.4,reading=51 50",
];
write_lp(&db, &lp_lines.join("\n"));
db.compact_open_chunk("h2o", partition_key).await.unwrap();
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 2);
assert_eq!(count_object_store_chunks(&db), 0);
vec![DbScenario {
scenario_name: "2 chunks in read buffer".into(),
db,
}]
make_two_chunk_scenarios(partition_key, &lp_lines1.join("\n"), &lp_lines2.join("\n")).await
}
}
@ -713,19 +704,19 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates {
// Chunk 1:
// . time range: 50-250
// . no duplicates in its own chunk
let lp_lines = vec![
let lp_lines1 = vec![
"h2o,state=MA,city=Boston min_temp=70.4 50",
"h2o,state=MA,city=Bedford min_temp=71.59 150",
"h2o,state=MA,city=Boston max_temp=75.4 250",
"h2o,state=MA,city=Andover max_temp=69.2, 250",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines1.join("\n"));
db.compact_open_chunk("h2o", partition_key).await.unwrap();
// Chunk 2: overlaps with chunk 1
// . time range: 150 - 300
// . no duplicates in its own chunk
let lp_lines = vec![
let lp_lines2 = vec![
"h2o,state=MA,city=Bedford max_temp=78.75,area=742u 150", // new field (area) and update available NULL (max_temp)
"h2o,state=MA,city=Boston min_temp=65.4 250", // update min_temp from NULL
"h2o,state=MA,city=Reading min_temp=53.4, 250",
@ -733,13 +724,13 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates {
"h2o,state=CA,city=SJ min_temp=78.5,max_temp=88.0 300",
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 350",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines2.join("\n"));
db.compact_open_chunk("h2o", partition_key).await.unwrap();
// Chunk 3: no overlap
// . time range: 400 - 500
// . duplicates in its own chunk
let lp_lines = vec![
let lp_lines3 = vec![
"h2o,state=MA,city=Bedford max_temp=80.75,area=742u 400",
"h2o,state=MA,city=Boston min_temp=68.4 400",
"h2o,state=MA,city=Bedford min_temp=65.22,area=750u 400", // duplicate
@ -747,13 +738,13 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates {
"h2o,state=CA,city=SJ min_temp=77.0,max_temp=90.7 450",
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=88.2 500",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines3.join("\n"));
db.compact_open_chunk("h2o", partition_key).await.unwrap();
// Chunk 4: no overlap
// . time range: 600 - 700
// . no duplicates
let lp_lines = vec![
let lp_lines4 = vec![
"h2o,state=MA,city=Bedford max_temp=88.75,area=742u 600",
"h2o,state=MA,city=Boston min_temp=67.4 600",
"h2o,state=MA,city=Reading min_temp=60.4, 600",
@ -761,26 +752,54 @@ impl DbSetup for OneMeasurementFourChunksWithDuplicates {
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=89.2 650",
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 700",
];
write_lp(&db, &lp_lines.join("\n"));
write_lp(&db, &lp_lines4.join("\n"));
db.compact_open_chunk("h2o", partition_key).await.unwrap();
assert_eq!(count_mutable_buffer_chunks(&db), 0);
assert_eq!(count_read_buffer_chunks(&db), 4);
assert_eq!(count_object_store_chunks(&db), 0);
vec![DbScenario {
let mut scenarios = vec![DbScenario {
scenario_name: "Data in four chunks with duplicates".into(),
db,
}]
}];
scenarios.append(
&mut make_n_chunks_scenario_new(&[
ChunkDataNew {
lp_lines: lp_lines1,
partition_key,
..Default::default()
},
ChunkDataNew {
lp_lines: lp_lines2,
partition_key,
..Default::default()
},
ChunkDataNew {
lp_lines: lp_lines3,
partition_key,
..Default::default()
},
ChunkDataNew {
lp_lines: lp_lines4,
partition_key,
..Default::default()
},
])
.await,
);
scenarios
}
}
#[derive(Debug)]
/// This has a single scenario with all the life cycle operations to
/// test queries that depend on that
pub struct TwoMeasurementsManyFieldsLifecycle {}
///
/// This scenario is OG-specific and can be used for `EXPLAIN` plans and system tables.
pub struct OldTwoMeasurementsManyFieldsLifecycle {}
#[async_trait]
impl DbSetup for TwoMeasurementsManyFieldsLifecycle {
impl DbSetup for OldTwoMeasurementsManyFieldsLifecycle {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
@ -1483,10 +1502,18 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
let data = lp_lines.join("\n");
write_lp(&db, &data);
// roll over and load chunks into both RUB and OS
rollover_and_load(&db, "2020-03-01T00", "h2o").await;
rollover_and_load(&db, "2020-03-02T00", "h2o").await;
rollover_and_load(&db, "2020-04-01T00", "h2o").await;
rollover_and_load(&db, "2020-04-02T00", "h2o").await;
db.persist_partition("h2o", "2020-03-01T00", true)
.await
.unwrap();
db.persist_partition("h2o", "2020-03-02T00", true)
.await
.unwrap();
db.persist_partition("h2o", "2020-04-01T00", true)
.await
.unwrap();
db.persist_partition("h2o", "2020-04-02T00", true)
.await
.unwrap();
let scenario3 = DbScenario {
scenario_name: "Data in 4 partitions, 4 closed chunks in mutable buffer".into(),
db,

View File

@ -8,6 +8,7 @@ use db::{
Db,
};
use iox_tests::util::{TestCatalog, TestNamespace};
use itertools::Itertools;
use querier::namespace::QuerierNamespace;
use query::QueryChunk;
use schema::merge::SchemaMerger;
@ -29,17 +30,42 @@ pub struct ChunkDataOld<'a> {
}
#[derive(Debug, Clone)]
pub struct ChunkDataNew<'a> {
pub struct ChunkDataNew<'a, 'b> {
/// Line protocol data of this chunk
pub lp_lines: Vec<&'a str>,
/// which stage this chunk will be created
pub chunk_stage: ChunkStageNew,
/// Delete predicates
pub preds: Vec<PredNew<'b>>,
/// Table that should be deleted.
pub delete_table_name: &'a str,
/// Partition key
pub partition_key: &'a str,
}
#[derive(Debug, Clone)]
pub enum ChunkData<'a> {
Old(ChunkDataOld<'a>),
New(ChunkDataNew<'a>),
impl<'a, 'b> ChunkDataNew<'a, 'b> {
fn with_chunk_stage(self, chunk_stage: ChunkStageNew) -> Self {
Self {
chunk_stage,
..self
}
}
}
impl<'a, 'b> Default for ChunkDataNew<'a, 'b> {
fn default() -> Self {
Self {
lp_lines: Default::default(),
chunk_stage: ChunkStageNew::Parquet,
preds: Default::default(),
delete_table_name: "",
partition_key: "",
}
}
}
#[derive(Debug, Clone, Copy)]
@ -138,6 +164,22 @@ pub struct PredNew<'a> {
delete_time: DeleteTimeNew,
}
impl<'a> PredNew<'a> {
pub fn begin(predicate: &'a DeletePredicate) -> Self {
Self {
predicate,
delete_time: DeleteTimeNew::begin(),
}
}
pub fn end(predicate: &'a DeletePredicate) -> Self {
Self {
predicate,
delete_time: DeleteTimeNew::end(),
}
}
}
#[derive(Debug, Clone)]
pub enum Pred<'a> {
Old(PredOld<'a>),
@ -615,15 +657,14 @@ async fn make_chunk_with_deletes_at_different_stages_new(
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("test_db").await;
let scenario_name = make_ng_chunk(
Arc::clone(&ns),
&lp_lines,
let chunk_data = ChunkDataNew {
lp_lines,
chunk_stage,
&preds,
preds,
delete_table_name,
partition_key,
)
.await;
};
let scenario_name = make_ng_chunk(Arc::clone(&ns), chunk_data).await;
let db = make_querier_namespace(ns).await;
@ -636,16 +677,6 @@ async fn make_chunk_with_deletes_at_different_stages_new(
// this function tests different-stage chunks in various stages when one or many deletes happen.
// Even though these 2 functions have some overlapped code, merging them in one
// function will created a much more complicated cases to handle
pub async fn make_different_stage_chunks_with_deletes_scenario(
_data: Vec<ChunkData<'_>>,
_preds: Vec<&DeletePredicate>,
_table_name: &str,
_partition_key: &str,
) -> DbScenario {
// this is used by `delete.rs` but currently that only generates OG data
unimplemented!()
}
pub async fn make_different_stage_chunks_with_deletes_scenario_old(
data: Vec<ChunkDataOld<'_>>,
preds: Vec<&DeletePredicate>,
@ -796,7 +827,7 @@ pub async fn make_os_chunks_and_then_compact_with_different_scenarios_with_delet
vec![scenario_1, scenario_2, scenario_3, scenario_4]
}
pub async fn make_contiguous_os_chunks(
async fn make_contiguous_os_chunks(
lp_lines_vec: Vec<Vec<&str>>,
table_name: &str,
partition_key: &str,
@ -998,51 +1029,46 @@ async fn make_two_chunk_scenarios_new(
data1: &str,
data2: &str,
) -> Vec<DbScenario> {
let mut scenarios = vec![];
let lp_lines1: Vec<_> = data1.split('\n').collect();
let lp_lines2: Vec<_> = data2.split('\n').collect();
for chunk_stage1 in ChunkStageNew::all() {
for chunk_stage2 in ChunkStageNew::all() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("test_db").await;
make_n_chunks_scenario_new(&[
ChunkDataNew {
lp_lines: lp_lines1,
partition_key,
..Default::default()
},
ChunkDataNew {
lp_lines: lp_lines2,
partition_key,
..Default::default()
},
])
.await
}
let name1 = make_ng_chunk(
Arc::clone(&ns),
&lp_lines1,
chunk_stage1,
&[],
"not_important",
partition_key,
)
.await;
let name2 = make_ng_chunk(
Arc::clone(&ns),
&lp_lines2,
chunk_stage2,
&[],
"not_important",
partition_key,
)
.await;
pub async fn make_n_chunks_scenario_new(chunks: &[ChunkDataNew<'_, '_>]) -> Vec<DbScenario> {
let mut scenarios = vec![];
let db = make_querier_namespace(ns).await;
let scenario_name = format!("Two chunks: {}; {}", name1, name2);
scenarios.push(DbScenario { scenario_name, db });
for stages in ChunkStageNew::all().into_iter().permutations(chunks.len()) {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("test_db").await;
let mut scenario_name = format!("{} chunks:", chunks.len());
for (chunk_stage, chunk_data) in stages.into_iter().zip(chunks) {
let chunk_data = chunk_data.clone().with_chunk_stage(chunk_stage);
let name = make_ng_chunk(Arc::clone(&ns), chunk_data).await;
write!(&mut scenario_name, "{}", name).unwrap();
}
let db = make_querier_namespace(ns).await;
scenarios.push(DbScenario { scenario_name, db });
}
scenarios
}
/// Rollover the mutable buffer and load chunk 0 to the read buffer and object store
pub async fn rollover_and_load(db: &Arc<Db>, partition_key: &str, table_name: &str) {
db.persist_partition(table_name, partition_key, true)
.await
.unwrap();
}
// // This function loads one chunk of lp data into RUB for testing predicate pushdown
pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
partition_key: &str,
@ -1082,14 +1108,7 @@ pub(crate) async fn make_one_rub_or_parquet_chunk_scenario(
vec![scenario1, scenario2]
}
async fn make_ng_chunk(
ns: Arc<TestNamespace>,
lp_lines: &[&str],
chunk_stage: ChunkStageNew,
preds: &[PredNew<'_>],
delete_table_name: &str,
partition_key: &str,
) -> String {
async fn make_ng_chunk(ns: Arc<TestNamespace>, chunk: ChunkDataNew<'_, '_>) -> String {
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
// detect table names and schemas from LP lines
@ -1097,9 +1116,7 @@ async fn make_ng_chunk(
let mut lp_lines_grouped: BTreeMap<_, Vec<_>> = BTreeMap::new();
let mut schemas: BTreeMap<_, SchemaMerger> = BTreeMap::new();
for lp in lp_lines {
let lp = *lp;
for lp in chunk.lp_lines {
let (table_name, batch) = lp_to_mutable_batch(lp);
lp_lines_grouped
@ -1141,7 +1158,7 @@ async fn make_ng_chunk(
for table in tables {
let partition = table
.with_sequencer(&sequencer)
.create_partition(partition_key)
.create_partition(chunk.partition_key)
.await;
partitions.insert(table.table.name.clone(), partition);
}
@ -1157,7 +1174,7 @@ async fn make_ng_chunk(
}
// create chunk
match chunk_stage {
match chunk.chunk_stage {
ChunkStageNew::Parquet => {
// need to use a temporary vector because BTree iterators ain't `Send`
let lp_lines_grouped: Vec<_> = lp_lines_grouped.into_iter().collect();
@ -1182,9 +1199,9 @@ async fn make_ng_chunk(
}
// attach delete predicates
let n_preds = preds.len();
if let Some(table) = tables.get(delete_table_name) {
for (i, pred) in preds.iter().enumerate() {
let n_preds = chunk.preds.len();
if let Some(table) = tables.get(chunk.delete_table_name) {
for (i, pred) in chunk.preds.iter().enumerate() {
match pred.delete_time {
DeleteTimeNew::Parquet => {
// parquet files got created w/ sequence number = 1
@ -1202,7 +1219,7 @@ async fn make_ng_chunk(
}
}
let mut name = format!("NG Chunk {}", chunk_stage);
let mut name = format!("NG Chunk {}", chunk.chunk_stage);
if n_preds > 0 {
write!(name, " with {} deletes", n_preds).unwrap();
}

View File

@ -139,7 +139,7 @@ async fn sql_select_from_system_operations() {
// Check that the cpu time used reported is greater than zero as it isn't
// repeatable
run_sql_test_case(
TwoMeasurementsManyFieldsLifecycle {},
OldTwoMeasurementsManyFieldsLifecycle {},
"SELECT id, status, CAST(start_time as BIGINT) > 0 as start_time, CAST(cpu_time_used AS BIGINT) > 0 as took_cpu_time, CAST(wall_time_used AS BIGINT) > 0 as took_wall_time, table_name, partition_key, description from system.operations",
&expected
).await;