feat(compactor): Implement ParquetFileSimulator and use it to show layout testing (#6932)

* feat(compactor): implement ParquetFileSimulator, show it working in tests

* fix: Update compactor2/src/components/parquet_files_sink/simulator.rs

Co-authored-by: Nga Tran <nga-tran@live.com>

* feat: Improve display of plan_ir

* refactor: return CompactResult, avoid `mut TestSetup`

---------

Co-authored-by: Nga Tran <nga-tran@live.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2023-02-10 15:51:17 +01:00 committed by GitHub
parent 92bdb450e0
commit d790406085
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 436 additions and 77 deletions

View File

@ -45,9 +45,11 @@ use super::{
level_exist::one_level::OneLevelExist,
parquet_file_sink::{
dedicated::DedicatedExecParquetFileSinkWrapper, logging::LoggingParquetFileSinkWrapper,
mock::MockParquetFileSink, object_store::ObjectStoreParquetFileSink, ParquetFileSink,
object_store::ObjectStoreParquetFileSink,
},
parquet_files_sink::{
dispatch::DispatchParquetFilesSink, simulator::ParquetFileSimulator, ParquetFilesSink,
},
parquet_files_sink::dispatch::DispatchParquetFilesSink,
partition_done_sink::{
catalog::CatalogPartitionDoneSink, error_kind::ErrorKindPartitionDoneSinkWrapper,
logging::LoggingPartitionDoneSinkWrapper, metrics::MetricsPartitionDoneSinkWrapper,
@ -240,10 +242,10 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
} else {
Arc::new(DedicatedDataFusionPlanExec::new(Arc::clone(&config.exec)))
};
let parquet_file_sink: Arc<dyn ParquetFileSink> = if config.simulate_without_object_store {
Arc::new(MockParquetFileSink::new(false))
let parquet_files_sink: Arc<dyn ParquetFilesSink> = if config.simulate_without_object_store {
Arc::new(ParquetFileSimulator::new())
} else {
Arc::new(LoggingParquetFileSinkWrapper::new(
let parquet_file_sink = Arc::new(LoggingParquetFileSinkWrapper::new(
DedicatedExecParquetFileSinkWrapper::new(
ObjectStoreParquetFileSink::new(
config.shard_id,
@ -252,7 +254,8 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
),
Arc::clone(&config.exec),
),
))
));
Arc::new(DispatchParquetFilesSink::new(parquet_file_sink))
};
Arc::new(Components {
@ -301,7 +304,7 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
Arc::clone(&config.exec),
)),
df_plan_exec,
parquet_files_sink: Arc::new(DispatchParquetFilesSink::new(parquet_file_sink)),
parquet_files_sink,
round_split: Arc::new(AllNowRoundSplit::new()),
divide_initial: Arc::new(SingleBranchDivideInitial::new()),
scratchpad_gen,

View File

@ -36,6 +36,7 @@ pub struct MockParquetFileSink {
impl MockParquetFileSink {
/// If filter_empty_files is true, parquet files that have "0" rows will not be written to `ParquetFile`s in the catalog.
#[cfg(test)]
pub fn new(filter_empty_files: bool) -> Self {
Self {
filter_empty_files,

View File

@ -14,6 +14,8 @@ use crate::{
use super::ParquetFilesSink;
#[derive(Debug)]
/// Writes parquet files to an inner [`ParquetFileSink`] (note the
/// lack of "s").
pub struct DispatchParquetFilesSink<T>
where
T: ParquetFileSink,
@ -44,7 +46,7 @@ where
#[async_trait]
impl<T> ParquetFilesSink for DispatchParquetFilesSink<T>
where
T: ParquetFileSink,
T: ParquetFileSink + 'static,
{
async fn stream_into_file_sink(
&self,
@ -84,4 +86,8 @@ where
.map_err(|e| Box::new(e) as _)
.await
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}

View File

@ -10,9 +10,13 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use crate::{error::DynError, partition_info::PartitionInfo, plan_ir::PlanIR};
pub mod dispatch;
pub mod simulator;
#[async_trait]
pub trait ParquetFilesSink: Debug + Display + Send + Sync {
/// Writes streams, which corresponds to the `plan_ir.files()` to
/// parquet files on object store, returning information about the
/// files that were created.
async fn stream_into_file_sink(
&self,
streams: Vec<SendableRecordBatchStream>,
@ -20,4 +24,7 @@ pub trait ParquetFilesSink: Debug + Display + Send + Sync {
target_level: CompactionLevel,
plan_ir: &PlanIR,
) -> Result<Vec<ParquetFileParams>, DynError>;
/// return this files sync as an Any dynamic object
fn as_any(&self) -> &dyn std::any::Any;
}

View File

@ -0,0 +1,290 @@
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use data_types::{
ColumnSet, CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, SequenceNumber,
ShardId, Timestamp,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use iox_time::Time;
use observability_deps::tracing::{debug, info};
use uuid::Uuid;
use crate::{error::DynError, partition_info::PartitionInfo, plan_ir::PlanIR};
use super::ParquetFilesSink;
/// Simulates the result of running a compaction plan that
/// produces multiple parquet files.
///
/// In general, since the Parquet format has high and variable
/// compression, it is not possible to predict what the output size of
/// a particular input will be, given just the schema and input
/// sizes. The output size depends heavily on the actual input data as
/// well as the parquet writer settings (e.g. row group size).
///
/// Rather than writing actual files during compactor tests, this
/// simulator produces [`ParquetFileParams`] describing a simulated
/// output of compacting one or more input parquet files, using rules
/// which can be alterted to testing how the compactor behaves
/// in different scenarios.
///
/// Scenarios that this simulator may offer in the future include:
///
/// 1. The output file size is significantly smaller than the
/// sum of the input files due to deduplication & delete application
///
/// 2. The distribution of data is nonuniform between the start and
/// end boundaries of the input files
///
/// 3. The output file time ranges are different than the the union of
/// the input files, due to delete predicate application or non
/// uniform distribution.
#[derive(Debug)]
pub struct ParquetFileSimulator {
runs: Arc<Mutex<Vec<SimulatedRun>>>,
}
impl std::fmt::Display for ParquetFileSimulator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ParquetFileSimulator")
}
}
impl ParquetFileSimulator {
pub fn new() -> Self {
Self {
runs: Arc::new(Mutex::new(vec![])),
}
}
/// Get a visual display of the simulated runs this simulator
/// has performed, for use in tests
#[cfg(test)]
pub fn runs(&self) -> Vec<String> {
let runs = (*self.runs.lock().unwrap()).clone();
runs.into_iter()
.enumerate()
.flat_map(|(i, run)| {
vec![
format!("**** Simulation Run {}, type={}", i, run.plan_type),
format!(
"Input, {} files: {}",
run.input_file_ids.len(),
run.input_file_ids
.into_iter()
.map(|f| f.to_string())
.collect::<Vec<_>>()
.join(", ")
),
]
})
.collect()
}
}
#[async_trait]
impl ParquetFilesSink for ParquetFileSimulator {
async fn stream_into_file_sink(
&self,
_streams: Vec<SendableRecordBatchStream>,
partition_info: Arc<PartitionInfo>,
target_level: CompactionLevel,
plan_ir: &PlanIR,
) -> Result<Vec<ParquetFileParams>, DynError> {
// compute max_l0_created_at
let max_l0_created_at: Time = plan_ir
.input_files()
.iter()
.map(|f| f.file.max_l0_created_at)
.max()
.expect("max_l0_created_at should have value")
.into();
info!("Simulating {plan_ir}");
let (plan_type, split_times): (String, &[i64]) = match plan_ir {
// pretend it is an empty split
PlanIR::Compact { files: _ } => (plan_ir.to_string(), &[]),
PlanIR::Split {
files: _,
split_times,
} => {
let plan_type = format!("{plan_ir}(split_times={split_times:?})");
(plan_type, split_times)
}
};
let input_files: Vec<_> = plan_ir
.input_files()
.iter()
.map(|f| SimulatedFile::from(&f.file))
.collect();
let input_file_ids: Vec<_> = plan_ir.input_files().iter().map(|f| f.file.id).collect();
let output_files = even_time_split(&input_files, split_times, target_level);
// Compute final output
let output: Vec<_> = output_files
.into_iter()
.map(|f| f.into_parquet_file_params(max_l0_created_at, partition_info.as_ref()))
.collect();
// record what we did
self.runs.lock().unwrap().push(SimulatedRun {
plan_type,
input_file_ids,
});
Ok(output)
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
/// Parameters of a `ParquetFile` that are part of the simulation
#[derive(Debug, Clone, Copy)]
pub struct SimulatedFile {
/// the min timestamp of data in this file
pub min_time: Timestamp,
/// the max timestamp of data in this file
pub max_time: Timestamp,
/// file size in bytes
pub file_size_bytes: i64,
/// the number of rows of data in this file
pub row_count: i64,
/// the compaction level of the file
pub compaction_level: CompactionLevel,
}
impl From<&ParquetFile> for SimulatedFile {
fn from(value: &ParquetFile) -> Self {
Self {
min_time: value.min_time,
max_time: value.max_time,
file_size_bytes: value.file_size_bytes,
row_count: value.row_count,
compaction_level: value.compaction_level,
}
}
}
impl std::fmt::Display for SimulatedFile {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ID.{}", self.compaction_level as i16)
}
}
impl SimulatedFile {
fn into_parquet_file_params(
self,
max_l0_created_at: Time,
partition_info: &PartitionInfo,
) -> ParquetFileParams {
let Self {
min_time,
max_time,
file_size_bytes,
row_count,
compaction_level,
} = self;
ParquetFileParams {
shard_id: ShardId::new(1),
namespace_id: partition_info.namespace_id,
table_id: partition_info.table.id,
partition_id: partition_info.partition_id,
object_store_id: Uuid::new_v4(),
max_sequence_number: SequenceNumber::new(0),
min_time,
max_time,
file_size_bytes,
row_count,
compaction_level,
created_at: Timestamp::new(1),
column_set: ColumnSet::new(vec![]),
max_l0_created_at: max_l0_created_at.into(),
}
}
}
/// Records the information about what the simulator did, for testing
/// purposes
#[derive(Debug, Clone)]
pub struct SimulatedRun {
// fields are used in testing
#[allow(dead_code)]
plan_type: String,
#[allow(dead_code)]
input_file_ids: Vec<ParquetFileId>,
}
/// Calculate simulated output files based on splitting files
/// according to `split_times`, assuming the data is uniformly
/// distributed between min and max times
fn even_time_split(
files: &[SimulatedFile],
split_times: &[i64],
target_level: CompactionLevel,
) -> Vec<SimulatedFile> {
let overall_min_time = files.iter().map(|f| f.min_time).min().unwrap();
let overall_max_time = files.iter().map(|f| f.max_time).max().unwrap();
let overall_time_range = overall_max_time - overall_min_time;
let total_rows: i64 = files.iter().map(|f| f.row_count).sum();
let total_size: i64 = files.iter().map(|f| f.file_size_bytes).sum();
// compute the timeranges data each file will have
let mut last_time = overall_min_time;
let mut time_ranges: Vec<_> = split_times
.iter()
.map(|time| {
let time = Timestamp::new(*time);
let ret = (last_time, time);
last_time = time;
ret
})
.collect();
// add the entry for the last bucket
time_ranges.push((last_time, overall_max_time));
debug!(
?overall_min_time,
?overall_max_time,
?overall_time_range,
?total_rows,
?total_size,
?time_ranges,
"creating output file from input files"
);
time_ranges
.into_iter()
.map(|(min_time, max_time)| {
let p = ((max_time - min_time).get() as f64) / ((overall_time_range).get() as f64);
let file_size_bytes = (total_size as f64 * p) as i64;
let row_count = (total_rows as f64 * p) as i64;
let compaction_level = target_level;
info!(
?p,
?min_time,
?max_time,
?file_size_bytes,
?row_count,
?compaction_level,
"creating output file with fraction of output"
);
SimulatedFile {
min_time,
max_time,
file_size_bytes,
row_count,
compaction_level,
}
})
.collect()
}

View File

@ -115,7 +115,7 @@ pub struct Config {
///
/// This will still use the catalog
///
/// This is mostly useful for testing.
/// This is useful for testing.
pub simulate_without_object_store: bool,
/// Ensure that ALL errors (including object store errors) result in "skipped" partitions.

View File

@ -7,9 +7,11 @@ use data_types::{ChunkOrder, ParquetFile};
pub enum PlanIR {
/// Compact `files` into a single large output file
Compact { files: Vec<FileIR> },
/// Compact `files` into one file for each entry in `split_times`.
/// Compact `files` into multiple files, for each entry in
/// `split_times`. If there are n split entries in split_times,
/// there will be `n+1` output files.
///
// The contents of each file:
/// The contents of each file:
/// * `0`: Rows that have `time` *on or before* the `split_times[0]`
/// * `i (0 < i < split_times's length)`: Rows that have `time` in range `(split_times[i-1], split_times[i]]`
/// * `n (n = split_times.len())`: Rows that have `time` *after* all the `split_times` and NULL rows

View File

@ -1,5 +1,7 @@
mod display;
pub(crate) use display::{format_files, format_files_split};
use iox_query::exec::ExecutorType;
use tracker::AsyncSemaphoreMetrics;
use std::{
collections::{BTreeMap, HashSet},
@ -29,8 +31,13 @@ use schema::sort::SortKey;
use uuid::Uuid;
use crate::{
components::namespaces_source::mock::NamespaceWrapper,
components::{
df_planner::panic::PanicDataFusionPlanner, hardcoded::hardcoded_components,
namespaces_source::mock::NamespaceWrapper,
parquet_files_sink::simulator::ParquetFileSimulator, Components,
},
config::{AlgoVersion, Config, PartitionsSourceConfig},
driver::compact,
partition_info::PartitionInfo,
};
@ -621,6 +628,68 @@ impl TestSetup {
pub fn test_times(&self) -> TestTimes {
TestTimes::new(self.config.time_provider.as_ref())
}
/// Run compaction job saving simulator state, if any
pub async fn run_compact(&self) -> CompactResult {
let components = hardcoded_components(&self.config);
self.run_compact_impl(Arc::clone(&components)).await
}
/// run a compaction plan where the df planner will panic
pub async fn run_compact_failing(&self) -> CompactResult {
let components = hardcoded_components(&self.config);
let components = Arc::new(Components {
df_planner: Arc::new(PanicDataFusionPlanner::new()),
..components.as_ref().clone()
});
self.run_compact_impl(components).await
}
async fn run_compact_impl(&self, components: Arc<Components>) -> CompactResult {
let config = Arc::clone(&self.config);
let job_semaphore = Arc::new(
Arc::new(AsyncSemaphoreMetrics::new(&config.metric_registry, [])).new_semaphore(10),
);
// register scratchpad store
self.catalog
.exec()
.new_context(ExecutorType::Reorg)
.inner()
.runtime_env()
.register_object_store(
"iox",
config.parquet_store_scratchpad.id(),
Arc::clone(config.parquet_store_scratchpad.object_store()),
);
compact(
NonZeroUsize::new(10).unwrap(),
Duration::from_secs(3_6000),
job_semaphore,
&components,
)
.await;
// get the results
let simulator_runs = if let Some(simulator) = components
.parquet_files_sink
.as_any()
.downcast_ref::<ParquetFileSimulator>()
{
simulator.runs()
} else {
vec![]
};
CompactResult { simulator_runs }
}
}
/// Information about the compaction that was run
pub struct CompactResult {
/// [`ParquetFileSimulator`] output, if enabled
pub simulator_runs: Vec<String>,
}
/// A collection of nanosecond timestamps relative to now

View File

@ -179,7 +179,10 @@ impl ParquetFileFormatter {
fn format_level(&self, level: &CompactionLevel) -> String {
let level_heading = display_level(level);
let level_heading = match self.file_size_seen {
FileSizeSeen::One(sz) => format!("{level_heading}, all files {sz}b"),
FileSizeSeen::One(sz) => {
let sz = display_size(sz);
format!("{level_heading}, all files {sz}")
}
_ => level_heading.into(),
};
@ -257,6 +260,21 @@ fn display_file_id(file: &ParquetFile) -> String {
format!("{level}.{id}")
}
fn display_size(sz: i64) -> String {
if sz < 1000 {
format!("{sz}b")
} else if sz < 1000000 {
let kb = (sz as f64) / 1000.0;
format!("{kb}k")
} else if sz < 1000000000 {
let mb = (sz as f64) / 1000000.0;
format!("{mb}m")
} else {
let gb = (sz as f64) / 1000000000.0;
format!("{gb}g")
}
}
/// Compact display of level, id min/max time and optional size.
///
/// Example
@ -270,7 +288,8 @@ fn display_format(file: &ParquetFile, show_size: bool) -> String {
let max_time = file.max_time.get(); // display as i64
let sz = file.file_size_bytes;
if show_size {
format!("{file_id}[{min_time},{max_time}] {sz}b")
let sz = display_size(sz);
format!("{file_id}[{min_time},{max_time}] {sz}")
} else {
format!("{file_id}[{min_time},{max_time}]")
}

View File

@ -1,17 +1,11 @@
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use std::time::Duration;
use arrow_util::assert_batches_sorted_eq;
use data_types::{CompactionLevel, ParquetFile, PartitionId};
use iox_query::exec::ExecutorType;
use iox_tests::util::TestParquetFileBuilder;
use tracker::AsyncSemaphoreMetrics;
use crate::{
components::{
df_planner::panic::PanicDataFusionPlanner, hardcoded::hardcoded_components, Components,
},
config::AlgoVersion,
driver::compact,
test_util::{format_files, list_object_store, TestSetup},
};
@ -26,7 +20,7 @@ async fn test_compact_no_file() {
assert!(files.is_empty());
// compact
run_compact(&setup).await;
setup.run_compact().await;
// verify catalog is still empty
let files = setup.list_by_table_not_to_delete().await;
@ -65,7 +59,7 @@ async fn test_num_files_over_limit() {
],
);
run_compact(&setup).await;
setup.run_compact().await;
//
// read files and verify they are not compacted
let files = setup.list_by_table_not_to_delete().await;
@ -119,7 +113,7 @@ async fn test_total_file_size_over_limit() {
],
);
run_compact(&setup).await;
setup.run_compact().await;
// read files and verify they are not compacted
let files = setup.list_by_table_not_to_delete().await;
@ -186,7 +180,7 @@ async fn test_compact_all_at_once() {
);
// compact
run_compact(&setup).await;
setup.run_compact().await;
// verify number of files: 6 files are compacted into 2 files
let files = setup.list_by_table_not_to_delete().await;
@ -292,7 +286,7 @@ async fn test_compact_target_level() {
);
// compact
run_compact(&setup).await;
setup.run_compact().await;
// verify number of files: 6 files are compacted into 2 files
let files = setup.list_by_table_not_to_delete().await;
@ -382,7 +376,7 @@ async fn test_skip_compact() {
.await;
// compact but nothing will be compacted because the partition is skipped
run_compact(&setup).await;
setup.run_compact().await;
// verify still 6 files
let files = setup.list_by_table_not_to_delete().await;
@ -402,7 +396,7 @@ async fn test_partition_fail() {
let object_store_files_pre = list_object_store(&setup.catalog.object_store).await;
assert!(!object_store_files_pre.is_empty());
run_compact_failing(&setup).await;
setup.run_compact_failing().await;
let catalog_files_post = setup.list_by_table_not_to_delete().await;
assert_eq!(catalog_files_pre, catalog_files_post);
@ -439,7 +433,7 @@ async fn test_shadow_mode() {
let object_store_files_pre = list_object_store(&setup.catalog.object_store).await;
assert!(!object_store_files_pre.is_empty());
run_compact(&setup).await;
setup.run_compact().await;
let catalog_files_post = setup.list_by_table_not_to_delete().await;
assert_eq!(catalog_files_pre, catalog_files_post);
@ -467,7 +461,7 @@ async fn test_shadow_mode_partition_fail() {
let object_store_files_pre = list_object_store(&setup.catalog.object_store).await;
assert!(!object_store_files_pre.is_empty());
run_compact_failing(&setup).await;
setup.run_compact_failing().await;
let catalog_files_post = setup.list_by_table_not_to_delete().await;
assert_eq!(catalog_files_pre, catalog_files_post);
@ -515,7 +509,7 @@ async fn test_pr6890() {
@r###"
---
- input
- "L0, all files 1000000b "
- "L0, all files 1m "
- "L0.1[100,200] |-------------------------------------L0.1-------------------------------------|"
- "L0.2[100,200] |-------------------------------------L0.2-------------------------------------|"
- "L0.3[100,200] |-------------------------------------L0.3-------------------------------------|"
@ -529,64 +523,32 @@ async fn test_pr6890() {
"###
);
run_compact(&setup).await;
let compact_result = setup.run_compact().await;
assert_skipped_compactions(&setup, []).await;
// ensure the plans did what we wanted
insta::assert_yaml_snapshot!(
compact_result.simulator_runs,
@r###"
---
- "**** Simulation Run 0, type=split(split_times=[180])"
- "Input, 10 files: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10"
"###
);
let output_files = setup.list_by_table_not_to_delete().await;
insta::assert_yaml_snapshot!(
format_files("input", &output_files),
@r###"
---
- input
- "L1, all files 1b "
- "L1.11[0,0] |------------------------------------L1.11-------------------------------------|"
- "L1.12[0,0] |------------------------------------L1.12-------------------------------------|"
- "L1 "
- "L1.11[100,180] 8m |----------------------------L1.11-----------------------------| "
- "L1.12[180,200] 2m |----L1.12-----|"
"###
);
}
async fn run_compact(setup: &TestSetup) {
let components = hardcoded_components(&setup.config);
run_compact_impl(setup, components).await;
}
async fn run_compact_failing(setup: &TestSetup) {
let components = hardcoded_components(&setup.config);
let components = Arc::new(Components {
df_planner: Arc::new(PanicDataFusionPlanner::new()),
..components.as_ref().clone()
});
run_compact_impl(setup, components).await;
}
async fn run_compact_impl(setup: &TestSetup, components: Arc<Components>) {
let config = Arc::clone(&setup.config);
let job_semaphore = Arc::new(
Arc::new(AsyncSemaphoreMetrics::new(&config.metric_registry, [])).new_semaphore(10),
);
// register scratchpad store
setup
.catalog
.exec()
.new_context(ExecutorType::Reorg)
.inner()
.runtime_env()
.register_object_store(
"iox",
config.parquet_store_scratchpad.id(),
Arc::clone(config.parquet_store_scratchpad.object_store()),
);
compact(
NonZeroUsize::new(10).unwrap(),
Duration::from_secs(3_6000),
job_semaphore,
&components,
)
.await;
}
#[track_caller]
fn assert_levels<'a>(
files: impl IntoIterator<Item = &'a ParquetFile>,