chore(iox/compactor): rename max_input_parquet_bytes_per_partition (#7160)
parent
309177b750
commit
c87113ccbf
|
@ -201,7 +201,7 @@ pub struct Compactor2Config {
|
||||||
)]
|
)]
|
||||||
pub max_num_files_per_plan: usize,
|
pub max_num_files_per_plan: usize,
|
||||||
|
|
||||||
/// Maximum input bytes (in parquet) per partition that the
|
/// Maximum input bytes (in parquet) per compaction that the
|
||||||
/// compactor will attempt to compact in any one plan.
|
/// compactor will attempt to compact in any one plan.
|
||||||
///
|
///
|
||||||
/// In the worst case, if the sum of the sizes of all parquet
|
/// In the worst case, if the sum of the sizes of all parquet
|
||||||
|
@ -213,12 +213,12 @@ pub struct Compactor2Config {
|
||||||
/// This setting is a self protection mechanism, and it is
|
/// This setting is a self protection mechanism, and it is
|
||||||
/// expected to be removed in future versions
|
/// expected to be removed in future versions
|
||||||
#[clap(
|
#[clap(
|
||||||
long = "compaction-max-input-parquet-bytes-per-partition",
|
long = "compaction-max-compact-size",
|
||||||
env = "INFLUXDB_IOX_COMPACTION_MAX_INPUT_PARQUET_BYTES_PER_PARTITION",
|
env = "INFLUXDB_IOX_COMPACTION_MAX_COMPACT_SIZE",
|
||||||
default_value = "268435456", // 256MB
|
default_value = "314572800", // 300MB
|
||||||
action
|
action
|
||||||
)]
|
)]
|
||||||
pub max_input_parquet_bytes_per_partition: usize,
|
pub max_compact_size: usize,
|
||||||
|
|
||||||
/// Number of shards.
|
/// Number of shards.
|
||||||
///
|
///
|
||||||
|
|
|
@ -135,7 +135,7 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
|
||||||
partition_filters.append(&mut make_partition_filters(config));
|
partition_filters.append(&mut make_partition_filters(config));
|
||||||
|
|
||||||
let partition_resource_limit_filters: Vec<Arc<dyn PartitionFilter>> = vec![Arc::new(
|
let partition_resource_limit_filters: Vec<Arc<dyn PartitionFilter>> = vec![Arc::new(
|
||||||
UnableToCompactPartitionFilter::new(config.max_input_parquet_bytes_per_partition),
|
UnableToCompactPartitionFilter::new(config.max_compact_size),
|
||||||
)];
|
)];
|
||||||
|
|
||||||
let partition_done_sink: Arc<dyn PartitionDoneSink> = if config.shadow_mode {
|
let partition_done_sink: Arc<dyn PartitionDoneSink> = if config.shadow_mode {
|
||||||
|
@ -277,10 +277,7 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
|
||||||
Arc::clone(&config.catalog),
|
Arc::clone(&config.catalog),
|
||||||
)),
|
)),
|
||||||
round_info_source: Arc::new(LoggingRoundInfoWrapper::new(Arc::new(
|
round_info_source: Arc::new(LoggingRoundInfoWrapper::new(Arc::new(
|
||||||
LevelBasedRoundInfo::new(
|
LevelBasedRoundInfo::new(config.max_num_files_per_plan, config.max_compact_size),
|
||||||
config.max_num_files_per_plan,
|
|
||||||
config.max_input_parquet_bytes_per_partition,
|
|
||||||
),
|
|
||||||
))),
|
))),
|
||||||
partition_filter: Arc::new(LoggingPartitionFilterWrapper::new(
|
partition_filter: Arc::new(LoggingPartitionFilterWrapper::new(
|
||||||
MetricsPartitionFilterWrapper::new(
|
MetricsPartitionFilterWrapper::new(
|
||||||
|
@ -316,9 +313,7 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
|
||||||
TargetLevelSplit::new(),
|
TargetLevelSplit::new(),
|
||||||
NonOverlapSplit::new(),
|
NonOverlapSplit::new(),
|
||||||
UpgradeSplit::new(config.max_desired_file_size_bytes),
|
UpgradeSplit::new(config.max_desired_file_size_bytes),
|
||||||
LoggingSplitOrCompactWrapper::new(SplitCompact::new(
|
LoggingSplitOrCompactWrapper::new(SplitCompact::new(config.max_compact_size)),
|
||||||
config.max_input_parquet_bytes_per_partition,
|
|
||||||
)),
|
|
||||||
),
|
),
|
||||||
))),
|
))),
|
||||||
partition_resource_limit_filter: Arc::new(LoggingPartitionFilterWrapper::new(
|
partition_resource_limit_filter: Arc::new(LoggingPartitionFilterWrapper::new(
|
||||||
|
|
|
@ -30,7 +30,7 @@ pub fn log_config(config: &Config) {
|
||||||
partitions_source,
|
partitions_source,
|
||||||
shadow_mode,
|
shadow_mode,
|
||||||
ignore_partition_skip_marker,
|
ignore_partition_skip_marker,
|
||||||
max_input_parquet_bytes_per_partition,
|
max_compact_size,
|
||||||
shard_config,
|
shard_config,
|
||||||
min_num_l1_files_to_compact,
|
min_num_l1_files_to_compact,
|
||||||
process_once,
|
process_once,
|
||||||
|
@ -77,7 +77,7 @@ pub fn log_config(config: &Config) {
|
||||||
%partitions_source,
|
%partitions_source,
|
||||||
shadow_mode,
|
shadow_mode,
|
||||||
ignore_partition_skip_marker,
|
ignore_partition_skip_marker,
|
||||||
max_input_parquet_bytes_per_partition,
|
max_compact_size,
|
||||||
?shard_cfg_n_shards,
|
?shard_cfg_n_shards,
|
||||||
?shard_cfg_shard_id,
|
?shard_cfg_shard_id,
|
||||||
min_num_l1_files_to_compact,
|
min_num_l1_files_to_compact,
|
||||||
|
|
|
@ -91,9 +91,9 @@ pub struct Config {
|
||||||
/// This is mostly useful for debugging.
|
/// This is mostly useful for debugging.
|
||||||
pub ignore_partition_skip_marker: bool,
|
pub ignore_partition_skip_marker: bool,
|
||||||
|
|
||||||
/// Maximum input bytes (in parquet) per partition. If there is more data, we ignore the partition (for now) as a
|
/// Maximum input bytes (from parquet files) per compaction. If there is more data, we ignore
|
||||||
/// self-protection mechanism.
|
/// the partition (for now) as a self-protection mechanism.
|
||||||
pub max_input_parquet_bytes_per_partition: usize,
|
pub max_compact_size: usize,
|
||||||
|
|
||||||
/// Shard config (if sharding should be enabled).
|
/// Shard config (if sharding should be enabled).
|
||||||
pub shard_config: Option<ShardConfig>,
|
pub shard_config: Option<ShardConfig>,
|
||||||
|
|
|
@ -86,7 +86,7 @@ async fn test_compact_target_level() {
|
||||||
.await
|
.await
|
||||||
// Ensure we have enough resource to compact the files
|
// Ensure we have enough resource to compact the files
|
||||||
.with_max_num_files_per_plan(10)
|
.with_max_num_files_per_plan(10)
|
||||||
.with_max_input_parquet_bytes_per_partition_relative_to_total_size(1000)
|
.with_max_compact_size_relative_to_total_size(1000)
|
||||||
.with_min_num_l1_files_to_compact(2)
|
.with_min_num_l1_files_to_compact(2)
|
||||||
.build()
|
.build()
|
||||||
.await;
|
.await;
|
||||||
|
@ -199,7 +199,7 @@ async fn test_compact_large_overlapes() {
|
||||||
// the test setup does not exceed number of files limit
|
// the test setup does not exceed number of files limit
|
||||||
.with_max_num_files_per_plan(10)
|
.with_max_num_files_per_plan(10)
|
||||||
// the test setup to have total file size exceed max compact size limit
|
// the test setup to have total file size exceed max compact size limit
|
||||||
.with_max_input_parquet_bytes_per_partition_relative_to_total_size(-1)
|
.with_max_compact_size_relative_to_total_size(-1)
|
||||||
.with_min_num_l1_files_to_compact(2)
|
.with_min_num_l1_files_to_compact(2)
|
||||||
.with_max_desired_file_size_bytes(100 * 1024 * 1024)
|
.with_max_desired_file_size_bytes(100 * 1024 * 1024)
|
||||||
.build()
|
.build()
|
||||||
|
@ -307,7 +307,7 @@ async fn test_compact_large_overlape_2() {
|
||||||
// the test setup does not exceed number of files limit
|
// the test setup does not exceed number of files limit
|
||||||
.with_max_num_files_per_plan(10)
|
.with_max_num_files_per_plan(10)
|
||||||
// the test setup exceed max compact size limit
|
// the test setup exceed max compact size limit
|
||||||
.with_max_input_parquet_bytes_per_partition_relative_to_total_size(-1)
|
.with_max_compact_size_relative_to_total_size(-1)
|
||||||
.with_min_num_l1_files_to_compact(2)
|
.with_min_num_l1_files_to_compact(2)
|
||||||
.with_max_desired_file_size_bytes(100 * 1024 * 1024)
|
.with_max_desired_file_size_bytes(100 * 1024 * 1024)
|
||||||
.build()
|
.build()
|
||||||
|
|
|
@ -393,7 +393,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition() {
|
||||||
// limit the plan to combining 30 mb at a time
|
// limit the plan to combining 30 mb at a time
|
||||||
let setup = layout_setup_builder()
|
let setup = layout_setup_builder()
|
||||||
.await
|
.await
|
||||||
.with_max_input_parquet_bytes_per_partition(30 * ONE_MB as usize)
|
.with_max_compact_size(30 * ONE_MB as usize)
|
||||||
// implicit max desired file size means it is 100MB and greater than the max compact size
|
// implicit max desired file size means it is 100MB and greater than the max compact size
|
||||||
.build()
|
.build()
|
||||||
.await;
|
.await;
|
||||||
|
@ -1083,7 +1083,7 @@ async fn all_overlapping_l0_max_input_bytes_per_partition_small_max_desired_file
|
||||||
// limit the plan to combining 30 mb at a time
|
// limit the plan to combining 30 mb at a time
|
||||||
let setup = layout_setup_builder()
|
let setup = layout_setup_builder()
|
||||||
.await
|
.await
|
||||||
.with_max_input_parquet_bytes_per_partition(30 * ONE_MB as usize)
|
.with_max_compact_size(30 * ONE_MB as usize)
|
||||||
// max desired file size is smaller than the max compact size
|
// max desired file size is smaller than the max compact size
|
||||||
.with_max_desired_file_size_bytes(10 * ONE_MB)
|
.with_max_desired_file_size_bytes(10 * ONE_MB)
|
||||||
.build()
|
.build()
|
||||||
|
|
|
@ -83,7 +83,7 @@ pub(crate) async fn layout_setup_builder() -> TestSetupBuilder<false> {
|
||||||
.with_percentage_max_file_size(20)
|
.with_percentage_max_file_size(20)
|
||||||
.with_split_percentage(80)
|
.with_split_percentage(80)
|
||||||
.with_max_num_files_per_plan(200)
|
.with_max_num_files_per_plan(200)
|
||||||
.with_max_input_parquet_bytes_per_partition(256 * ONE_MB as usize)
|
.with_max_compact_size(256 * ONE_MB as usize)
|
||||||
.with_min_num_l1_files_to_compact(10)
|
.with_min_num_l1_files_to_compact(10)
|
||||||
.with_max_desired_file_size_bytes(100 * ONE_MB)
|
.with_max_desired_file_size_bytes(100 * ONE_MB)
|
||||||
.simulate_without_object_store()
|
.simulate_without_object_store()
|
||||||
|
|
|
@ -135,7 +135,7 @@ impl TestSetupBuilder<false> {
|
||||||
partitions_source: PartitionsSourceConfig::CatalogRecentWrites,
|
partitions_source: PartitionsSourceConfig::CatalogRecentWrites,
|
||||||
shadow_mode: false,
|
shadow_mode: false,
|
||||||
ignore_partition_skip_marker: false,
|
ignore_partition_skip_marker: false,
|
||||||
max_input_parquet_bytes_per_partition: usize::MAX,
|
max_compact_size: usize::MAX,
|
||||||
shard_config: None,
|
shard_config: None,
|
||||||
min_num_l1_files_to_compact: MIN_NUM_L1_FILES_TO_COMPACT,
|
min_num_l1_files_to_compact: MIN_NUM_L1_FILES_TO_COMPACT,
|
||||||
process_once: true,
|
process_once: true,
|
||||||
|
@ -486,15 +486,12 @@ impl TestSetupBuilder<false> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TestSetupBuilder<true> {
|
impl TestSetupBuilder<true> {
|
||||||
/// Set max_input_parquet_bytes_per_partition
|
/// Set max_compact_size
|
||||||
pub fn with_max_input_parquet_bytes_per_partition_relative_to_total_size(
|
pub fn with_max_compact_size_relative_to_total_size(self, delta: isize) -> Self {
|
||||||
self,
|
|
||||||
delta: isize,
|
|
||||||
) -> Self {
|
|
||||||
let total_size = self.files.iter().map(|f| f.file_size_bytes).sum::<i64>();
|
let total_size = self.files.iter().map(|f| f.file_size_bytes).sum::<i64>();
|
||||||
Self {
|
Self {
|
||||||
config: Config {
|
config: Config {
|
||||||
max_input_parquet_bytes_per_partition: (total_size as isize + delta) as usize,
|
max_compact_size: (total_size as isize + delta) as usize,
|
||||||
..self.config
|
..self.config
|
||||||
},
|
},
|
||||||
..self
|
..self
|
||||||
|
@ -548,12 +545,9 @@ impl<const WITH_FILES: bool> TestSetupBuilder<WITH_FILES> {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set max_input_parquet_bytes_per_partition
|
/// Set max_compact_size
|
||||||
pub fn with_max_input_parquet_bytes_per_partition(
|
pub fn with_max_compact_size(mut self, max_compact_size: usize) -> Self {
|
||||||
mut self,
|
self.config.max_compact_size = max_compact_size;
|
||||||
max_input_parquet_bytes_per_partition: usize,
|
|
||||||
) -> Self {
|
|
||||||
self.config.max_input_parquet_bytes_per_partition = max_input_parquet_bytes_per_partition;
|
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -479,7 +479,7 @@ impl Config {
|
||||||
partition_filter: None,
|
partition_filter: None,
|
||||||
shadow_mode: false,
|
shadow_mode: false,
|
||||||
ignore_partition_skip_marker: false,
|
ignore_partition_skip_marker: false,
|
||||||
max_input_parquet_bytes_per_partition: 268_435_456, // 256 MB
|
max_compact_size: 268_435_456, // 256 MB
|
||||||
shard_count: None,
|
shard_count: None,
|
||||||
shard_id: None,
|
shard_id: None,
|
||||||
min_num_l1_files_to_compact: 1,
|
min_num_l1_files_to_compact: 1,
|
||||||
|
|
|
@ -32,7 +32,7 @@ use trace::TraceCollector;
|
||||||
const TOPIC: &str = "iox-shared";
|
const TOPIC: &str = "iox-shared";
|
||||||
const TRANSITION_SHARD_INDEX: i32 = TRANSITION_SHARD_NUMBER;
|
const TRANSITION_SHARD_INDEX: i32 = TRANSITION_SHARD_NUMBER;
|
||||||
|
|
||||||
// Minimum multiple between max_desired_file_size_bytes and max_input_parquet_bytes_per_partition
|
// Minimum multiple between max_desired_file_size_bytes and max_compact_size
|
||||||
// Since max_desired_file_size_bytes is softly enforced, actual file sizes can exceed it, and a
|
// Since max_desired_file_size_bytes is softly enforced, actual file sizes can exceed it, and a
|
||||||
// single compaction job must be able to compact >1 max sized file, so the multiple should be at least 3.
|
// single compaction job must be able to compact >1 max sized file, so the multiple should be at least 3.
|
||||||
const MIN_COMPACT_SIZE_MULTIPLE: i64 = 3;
|
const MIN_COMPACT_SIZE_MULTIPLE: i64 = 3;
|
||||||
|
@ -178,13 +178,10 @@ pub async fn create_compactor2_server_type(
|
||||||
};
|
};
|
||||||
|
|
||||||
if compactor_config.max_desired_file_size_bytes as i64 * MIN_COMPACT_SIZE_MULTIPLE
|
if compactor_config.max_desired_file_size_bytes as i64 * MIN_COMPACT_SIZE_MULTIPLE
|
||||||
> compactor_config
|
> compactor_config.max_compact_size.try_into().unwrap()
|
||||||
.max_input_parquet_bytes_per_partition
|
|
||||||
.try_into()
|
|
||||||
.unwrap()
|
|
||||||
{
|
{
|
||||||
panic!("max_input_parquet_bytes_per_partition ({}) must be at least {} times larger than max_desired_file_size_bytes ({})",
|
panic!("max_compact_size ({}) must be at least {} times larger than max_desired_file_size_bytes ({})",
|
||||||
compactor_config.max_input_parquet_bytes_per_partition,
|
compactor_config.max_compact_size,
|
||||||
MIN_COMPACT_SIZE_MULTIPLE,
|
MIN_COMPACT_SIZE_MULTIPLE,
|
||||||
compactor_config.max_desired_file_size_bytes);
|
compactor_config.max_desired_file_size_bytes);
|
||||||
}
|
}
|
||||||
|
@ -212,8 +209,7 @@ pub async fn create_compactor2_server_type(
|
||||||
partitions_source,
|
partitions_source,
|
||||||
shadow_mode: compactor_config.shadow_mode,
|
shadow_mode: compactor_config.shadow_mode,
|
||||||
ignore_partition_skip_marker: compactor_config.ignore_partition_skip_marker,
|
ignore_partition_skip_marker: compactor_config.ignore_partition_skip_marker,
|
||||||
max_input_parquet_bytes_per_partition: compactor_config
|
max_compact_size: compactor_config.max_compact_size,
|
||||||
.max_input_parquet_bytes_per_partition,
|
|
||||||
shard_config,
|
shard_config,
|
||||||
min_num_l1_files_to_compact: compactor_config.min_num_l1_files_to_compact,
|
min_num_l1_files_to_compact: compactor_config.min_num_l1_files_to_compact,
|
||||||
process_once: compactor_config.process_once,
|
process_once: compactor_config.process_once,
|
||||||
|
|
Loading…
Reference in New Issue