fix: Change placeholder new_param to input_size_threshold_bytes
parent
d62b1ed7ee
commit
128833e7d9
|
|
@ -72,7 +72,7 @@ pub struct CompactorConfig {
|
|||
/// The compactor will limit the number of simultaneous compaction jobs based on the
|
||||
/// size of the input files to be compacted. This number should be less than 1/10th
|
||||
/// of the available memory to ensure compactions have
|
||||
/// enough space to run. Default is 1,073,741,824 bytes (1GB ).
|
||||
/// enough space to run. Default is 1,073,741,824 bytes (1GB).
|
||||
#[clap(
|
||||
long = "--compaction-concurrent-size-bytes",
|
||||
env = "INFLUXDB_IOX_COMPACTION_CONCURRENT_SIZE_BYTES",
|
||||
|
|
@ -101,12 +101,14 @@ pub struct CompactorConfig {
|
|||
)]
|
||||
pub min_number_recent_ingested_files_per_partition: usize,
|
||||
|
||||
/// TODO: Describe this. Multiplier?
|
||||
/// A compaction operation will gather as many L0 files with their overlapping L1 files to
|
||||
/// compact together until the total size of input files crosses this threshold. Later
|
||||
/// compactions will pick up the remaining L0 files. Default is 314,572,800 bytes (300MB).
|
||||
#[clap(
|
||||
long = "--new-param",
|
||||
env = "INFLUXDB_IOX_COMPACTION_NEW_PARAM",
|
||||
default_value = "3",
|
||||
long = "--compaction-input-size-threshold-bytes",
|
||||
env = "INFLUXDB_IOX_COMPACTION_INPUT_SIZE_THRESHOLD_BYTES",
|
||||
default_value = "314572800",
|
||||
action
|
||||
)]
|
||||
pub new_param: u64,
|
||||
pub input_size_threshold_bytes: u64,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2613,7 +2613,7 @@ mod tests {
|
|||
let max_concurrent_size_bytes = 100_000;
|
||||
let max_number_partitions_per_sequencer = 1;
|
||||
let min_number_recent_ingested_per_partition = 1;
|
||||
let new_param = 3;
|
||||
let input_size_threshold_bytes = 300 * 1024 * 1024;
|
||||
CompactorConfig::new(
|
||||
max_desired_file_size_bytes,
|
||||
percentage_max_file_size,
|
||||
|
|
@ -2621,7 +2621,7 @@ mod tests {
|
|||
max_concurrent_size_bytes,
|
||||
max_number_partitions_per_sequencer,
|
||||
min_number_recent_ingested_per_partition,
|
||||
new_param,
|
||||
input_size_threshold_bytes,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -130,8 +130,10 @@ pub struct CompactorConfig {
|
|||
/// Min number of recent ingested files a partition needs to be considered for compacting
|
||||
min_number_recent_ingested_files_per_partition: usize,
|
||||
|
||||
/// TODO: describe this. Multiplier?
|
||||
new_param: u64,
|
||||
/// A compaction operation will gather as many L0 files with their overlapping L1 files to
|
||||
/// compact together until the total size of input files crosses this threshold. Later
|
||||
/// compactions will pick up the remaining L0 files.
|
||||
input_size_threshold_bytes: u64,
|
||||
}
|
||||
|
||||
impl CompactorConfig {
|
||||
|
|
@ -143,7 +145,7 @@ impl CompactorConfig {
|
|||
max_concurrent_compaction_size_bytes: u64,
|
||||
max_number_partitions_per_sequencer: usize,
|
||||
min_number_recent_ingested_files_per_partition: usize,
|
||||
new_param: u64,
|
||||
input_size_threshold_bytes: u64,
|
||||
) -> Self {
|
||||
assert!(split_percentage > 0 && split_percentage <= 100);
|
||||
|
||||
|
|
@ -154,7 +156,7 @@ impl CompactorConfig {
|
|||
max_concurrent_compaction_size_bytes,
|
||||
max_number_partitions_per_sequencer,
|
||||
min_number_recent_ingested_files_per_partition,
|
||||
new_param,
|
||||
input_size_threshold_bytes,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -192,9 +194,11 @@ impl CompactorConfig {
|
|||
self.min_number_recent_ingested_files_per_partition
|
||||
}
|
||||
|
||||
/// TODO: Describe this. Multiplier?
|
||||
pub fn new_param(&self) -> u64 {
|
||||
self.new_param
|
||||
/// A compaction operation will gather as many L0 files with their overlapping L1 files to
|
||||
/// compact together until the total size of input files crosses this threshold. Later
|
||||
/// compactions will pick up the remaining L0 files.
|
||||
pub fn input_size_threshold_bytes(&self) -> u64 {
|
||||
self.input_size_threshold_bytes
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -47,11 +47,9 @@ pub(crate) async fn compact_partition(
|
|||
.await
|
||||
.context(ParquetFileLookupSnafu)?;
|
||||
|
||||
let max_bytes = compactor.config.max_desired_file_size_bytes() * compactor.config.new_param();
|
||||
|
||||
let _to_compact = parquet_file_filtering::filter_parquet_files(
|
||||
parquet_files_for_compaction,
|
||||
max_bytes,
|
||||
compactor.config.input_size_threshold_bytes(),
|
||||
&compactor.parquet_file_candidate_gauge,
|
||||
&compactor.parquet_file_candidate_bytes_gauge,
|
||||
);
|
||||
|
|
|
|||
|
|
@ -392,13 +392,13 @@ impl Config {
|
|||
topic: QUERY_POOL_NAME.to_string(),
|
||||
write_buffer_partition_range_start,
|
||||
write_buffer_partition_range_end,
|
||||
max_desired_file_size_bytes: 30000,
|
||||
max_desired_file_size_bytes: 30_000,
|
||||
percentage_max_file_size: 30,
|
||||
split_percentage: 80,
|
||||
max_concurrent_size_bytes: 100000,
|
||||
max_concurrent_size_bytes: 100_000,
|
||||
max_number_partitions_per_sequencer: 1,
|
||||
min_number_recent_ingested_files_per_partition: 1,
|
||||
new_param: 3,
|
||||
input_size_threshold_bytes: 314_572_800,
|
||||
};
|
||||
|
||||
let querier_config = QuerierConfig {
|
||||
|
|
|
|||
|
|
@ -170,7 +170,7 @@ pub async fn create_compactor_server_type(
|
|||
compactor_config.max_concurrent_size_bytes,
|
||||
compactor_config.max_number_partitions_per_sequencer,
|
||||
compactor_config.min_number_recent_ingested_files_per_partition,
|
||||
compactor_config.new_param,
|
||||
compactor_config.input_size_threshold_bytes,
|
||||
);
|
||||
let compactor_handler = Arc::new(CompactorHandlerImpl::new(
|
||||
sequencers,
|
||||
|
|
|
|||
Loading…
Reference in New Issue