Merge branch 'main' into pd/data-generator-many-dbs

pull/24376/head
kodiakhq[bot] 2021-12-08 17:17:58 +00:00 committed by GitHub
commit 90a6b255a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 173 additions and 11 deletions

View File

@ -10,6 +10,7 @@ use crate::scenarios::util::{
ChunkStage, ChunkStage,
}; };
use super::util::make_os_chunks_and_then_compact_with_different_scenarios_with_delete;
use super::{DbScenario, DbSetup}; use super::{DbScenario, DbSetup};
// ========================================================================================================================= // =========================================================================================================================
@ -379,30 +380,50 @@ impl DbSetup for ThreeDeleteThreeChunks {
// 3 chunks: 3 OS // 3 chunks: 3 OS
let lp = vec![ let lp = vec![
ChunkData { ChunkData {
lp_lines: lp_lines_1, lp_lines: lp_lines_1.clone(),
chunk_stage: ChunkStage::Os, chunk_stage: ChunkStage::Os,
}, },
ChunkData { ChunkData {
lp_lines: lp_lines_2, lp_lines: lp_lines_2.clone(),
chunk_stage: ChunkStage::Os, chunk_stage: ChunkStage::Os,
}, },
ChunkData { ChunkData {
lp_lines: lp_lines_3, lp_lines: lp_lines_3.clone(),
chunk_stage: ChunkStage::Os, chunk_stage: ChunkStage::Os,
}, },
]; ];
let scenario_3os = let scenario_3os = make_different_stage_chunks_with_deletes_scenario(
make_different_stage_chunks_with_deletes_scenario(lp, preds, table_name, partition_key) lp,
.await; preds.clone(),
table_name,
partition_key,
)
.await;
// ----------------------
// A few more scenarios to compact all 3 OS chunk or the fist 2 OS chunks
// with delete before or after the os_compaction
let compact_os_scenarios =
make_os_chunks_and_then_compact_with_different_scenarios_with_delete(
vec![lp_lines_1.clone(), lp_lines_2.clone(), lp_lines_3.clone()],
preds.clone(),
table_name,
partition_key,
)
.await;
// return scenarios to run queries // return scenarios to run queries
vec![ let mut scenarios = vec![
scenario_mub_rub_os, scenario_mub_rub_os,
scenario_2mub_rub, scenario_2mub_rub,
scenario_2mub_os, scenario_2mub_os,
scenario_2rub_os, scenario_2rub_os,
scenario_rub_2os, scenario_rub_2os,
scenario_3os, scenario_3os,
] ];
scenarios.extend(compact_os_scenarios.into_iter());
scenarios
} }
} }

View File

@ -1,7 +1,9 @@
//! This module contains util functions for testing scenarios //! This module contains util functions for testing scenarios
use data_types::chunk_metadata::ChunkId;
use data_types::delete_predicate::DeletePredicate; use data_types::delete_predicate::DeletePredicate;
use query::{QueryChunk, QueryDatabase}; use query::{QueryChunk, QueryDatabase};
use server::Db;
use std::fmt::Display; use std::fmt::Display;
use std::sync::Arc; use std::sync::Arc;
@ -503,3 +505,110 @@ pub async fn make_different_stage_chunks_with_deletes_scenario(
); );
DbScenario { scenario_name, db } DbScenario { scenario_name, db }
} }
pub async fn make_os_chunks_and_then_compact_with_different_scenarios_with_delete(
lp_lines_vec: Vec<Vec<&str>>,
preds: Vec<&DeletePredicate>,
table_name: &str,
partition_key: &str,
) -> Vec<DbScenario> {
// Scenario 1: apply deletes and then compact all 3 chunks
let (db, chunk_ids) =
make_contiguous_os_chunks(lp_lines_vec.clone(), table_name, partition_key).await;
for pred in &preds {
db.delete(table_name, Arc::new((*pred).clone())).unwrap();
}
db.compact_object_store_chunks(table_name, partition_key, chunk_ids)
.unwrap()
.join()
.await;
let scenario_name = "Deletes and then compact all OS chunks".to_string();
let scenario_1 = DbScenario { scenario_name, db };
// Scenario 2: compact all 3 chunks and apply deletes
let (db, chunk_ids) =
make_contiguous_os_chunks(lp_lines_vec.clone(), table_name, partition_key).await;
db.compact_object_store_chunks(table_name, partition_key, chunk_ids)
.unwrap()
.join()
.await;
for pred in &preds {
db.delete(table_name, Arc::new((*pred).clone())).unwrap();
}
let scenario_name = "Compact all OS chunks and then deletes".to_string();
let scenario_2 = DbScenario { scenario_name, db };
// Scenario 3: apply deletes then compact the first n-1 chunks
let (db, chunk_ids) =
make_contiguous_os_chunks(lp_lines_vec.clone(), table_name, partition_key).await;
for pred in &preds {
db.delete(table_name, Arc::new((*pred).clone())).unwrap();
}
let (_last_chunk_id, chunk_ids_but_last) = chunk_ids.split_last().unwrap();
db.compact_object_store_chunks(table_name, partition_key, chunk_ids_but_last.to_vec())
.unwrap()
.join()
.await;
let scenario_name = "Deletes and then compact all but last OS chunk".to_string();
let scenario_3 = DbScenario { scenario_name, db };
// Scenario 4: compact the first n-1 chunks then apply deletes
let (db, chunk_ids) =
make_contiguous_os_chunks(lp_lines_vec.clone(), table_name, partition_key).await;
let (_last_chunk_id, chunk_ids_but_last) = chunk_ids.split_last().unwrap();
db.compact_object_store_chunks(table_name, partition_key, chunk_ids_but_last.to_vec())
.unwrap()
.join()
.await;
for pred in &preds {
db.delete(table_name, Arc::new((*pred).clone())).unwrap();
}
let scenario_name = "Compact all but last OS chunk and then deletes".to_string();
let scenario_4 = DbScenario { scenario_name, db };
vec![scenario_1, scenario_2, scenario_3, scenario_4]
}
pub async fn make_contiguous_os_chunks(
lp_lines_vec: Vec<Vec<&str>>,
table_name: &str,
partition_key: &str,
) -> (Arc<Db>, Vec<ChunkId>) {
// This test is aimed for at least 3 chunks
assert!(lp_lines_vec.len() >= 3);
// First make all OS chunks fot the lp_lins_vec
// Define they are OS
let mut chunk_data_vec = vec![];
for lp_lines in lp_lines_vec {
let chunk_data = ChunkData {
lp_lines: lp_lines.clone(),
chunk_stage: ChunkStage::Os,
};
chunk_data_vec.push(chunk_data);
}
// Make db with those OS chunks
let scenario = make_different_stage_chunks_with_deletes_scenario(
chunk_data_vec,
vec![], // not delete anything yet
table_name,
partition_key,
)
.await;
// Get chunk ids in contiguous order
let db = scenario.db;
let partition = db.partition(table_name, partition_key).unwrap();
let partition = partition.read();
let mut keyed_chunks: Vec<(_, _)> = partition
.keyed_chunks()
.into_iter()
.map(|(id, order, _chunk)| (id, order))
.collect();
keyed_chunks.sort_by(|(_id1, order1), (_id2, order2)| order1.cmp(order2));
let chunk_ids: Vec<_> = keyed_chunks.iter().map(|(id, _order)| *id).collect();
(db, chunk_ids)
}

View File

@ -201,6 +201,14 @@ impl DbChunk {
} }
} }
/// Returns the contained `ParquetChunk`, if this chunk is stored as parquet
pub fn parquet_chunk(&self) -> Option<&Arc<ParquetChunk>> {
match &self.state {
State::ParquetFile { chunk } => Some(chunk),
_ => None,
}
}
/// Return the address of this chunk /// Return the address of this chunk
pub fn addr(&self) -> &ChunkAddr { pub fn addr(&self) -> &ChunkAddr {
&self.addr &self.addr

View File

@ -796,12 +796,26 @@ mod tests {
assert_eq!(summary_chunks[3].storage, ChunkStorage::ObjectStoreOnly); assert_eq!(summary_chunks[3].storage, ChunkStorage::ObjectStoreOnly);
assert_eq!(summary_chunks[3].row_count, 3); // chunk_id_2 assert_eq!(summary_chunks[3].row_count, 3); // chunk_id_2
// Get min partition checkpoint which is the checkpoint of the first chunk
let min_partition_checkpoint = {
let chunk = chunks[0].clone().chunk;
let chunk = chunk.read();
let parquet_chunk = chunk.parquet_chunk().unwrap();
let iox_parquet_metadata = parquet_chunk.parquet_metadata();
let iox_metadata = iox_parquet_metadata
.decode()
.unwrap()
.read_iox_metadata()
.unwrap();
iox_metadata.partition_checkpoint
};
// compact 3 contiguous chunks 1, 2, 3 // compact 3 contiguous chunks 1, 2, 3
let partition = partition.upgrade(); let partition = partition.upgrade();
let chunk1 = chunks[0].write(); let chunk1 = chunks[0].write();
let chunk2 = chunks[1].write(); let chunk2 = chunks[1].write();
let chunk3 = chunks[2].write(); let chunk3 = chunks[2].write();
let _compacted_chunk = compact_object_store_chunks(partition, vec![chunk1, chunk2, chunk3]) let compacted_chunk = compact_object_store_chunks(partition, vec![chunk1, chunk2, chunk3])
.unwrap() .unwrap()
.1 .1
.await .await
@ -819,6 +833,18 @@ mod tests {
// OS: the result of compacting all 3 persisted chunks // OS: the result of compacting all 3 persisted chunks
assert_eq!(summary_chunks[1].storage, ChunkStorage::ObjectStoreOnly); assert_eq!(summary_chunks[1].storage, ChunkStorage::ObjectStoreOnly);
assert_eq!(summary_chunks[1].row_count, 2); assert_eq!(summary_chunks[1].row_count, 2);
// verify partition checkpoint of the compacted chunk
let chunk = compacted_chunk.unwrap();
let parquet_chunk = chunk.parquet_chunk().unwrap();
let iox_parquet_metadata = parquet_chunk.parquet_metadata();
let iox_metadata = iox_parquet_metadata
.decode()
.unwrap()
.read_iox_metadata()
.unwrap();
let compacted_partition_checkpoint = iox_metadata.partition_checkpoint;
assert_eq!(min_partition_checkpoint, compacted_partition_checkpoint);
} }
#[tokio::test] #[tokio::test]
@ -887,7 +913,5 @@ mod tests {
// todo: add tests // todo: add tests
// . compact with deletes happening during compaction // . compact with deletes happening during compaction
// . verify checkpoints
// . replay // . replay
// . end-to-end tests to not only verify row num but also data
} }