feat: config params for Compactor (#5108)
* feat: config params for Compactor * refactor: address review comments Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
9e09f77a45
commit
5c5c964dfe
|
|
@ -27,15 +27,58 @@ pub struct CompactorConfig {
|
|||
)]
|
||||
pub write_buffer_partition_range_end: i32,
|
||||
|
||||
/// Percentage of least recent data we want to split to reduce compacting non-overlapped data
|
||||
/// Must be between 0 and 100. Default is 100, which won't split the resulting file.
|
||||
/// Max number of level-0 files (written by ingester) we want to compact with level-1 each time
|
||||
/// Default: 3
|
||||
#[clap(
|
||||
// TODO: verify the theory that if the streaming works as expected,
|
||||
// we do not need to limit this number
|
||||
long = "--compaction-max-number-level-0-files",
|
||||
env = "INFLUXDB_IOX_COMPACTION_MAX_NUMBER_LEVEL_0_FILES",
|
||||
default_value = "3",
|
||||
action
|
||||
)]
|
||||
pub compaction_max_number_level_0_files: i32,
|
||||
|
||||
/// Desired max size of compacted parquet files
|
||||
/// It is a target desired value than a guarantee
|
||||
/// Default is 100,000,000 (100MB)
|
||||
#[clap(
|
||||
long = "--compaction-max-desired-size-bytes",
|
||||
env = "INFLUXDB_IOX_COMPACTION_MAX_DESIRED_FILE_SIZE_BYTES",
|
||||
default_value = "100000000",
|
||||
action
|
||||
)]
|
||||
pub compaction_max_desired_file_size_bytes: i64,
|
||||
|
||||
/// Percentage of desired max file size.
|
||||
/// If the estimated compacted result is too small, no need to split it.
|
||||
/// This percentage is to determine how small it is:
|
||||
/// < compaction_percentage_max_file_size * compaction_max_desired_file_size_bytes:
|
||||
/// This value must be between (0, 100)
|
||||
/// Default is 30
|
||||
#[clap(
|
||||
long = "--compaction-percentage-max-file_size",
|
||||
env = "INFLUXDB_IOX_COMPACTION_PERCENTAGE_MAX_FILE_SIZE",
|
||||
default_value = "30",
|
||||
action
|
||||
)]
|
||||
pub compaction_percentage_max_file_size: i16,
|
||||
|
||||
/// Split file percentage
|
||||
/// If the estimated compacted result is neither too small nor too large, it will be split
|
||||
/// into 2 files determined by this percentage.
|
||||
/// . Too small means: < compaction_percentage_max_file_size * compaction_max_desired_file_size_bytes
|
||||
/// . Too large means: > compaction_max_desired_file_size_bytes
|
||||
/// . Any size in the middle will be considered neither too small nor too large
|
||||
/// This value must be between (0, 100)
|
||||
/// Default is 80
|
||||
#[clap(
|
||||
long = "--compaction-split-percentage",
|
||||
env = "INFLUXDB_IOX_COMPACTION_SPLIT_PERCENTAGE",
|
||||
default_value = "100",
|
||||
default_value = "80",
|
||||
action
|
||||
)]
|
||||
pub split_percentage: i64,
|
||||
pub compaction_split_percentage: i16,
|
||||
|
||||
/// The compactor will limit the number of simultaneous compaction jobs based on the
|
||||
/// size of the input files to be compacted. This number should be less than 1/10th
|
||||
|
|
@ -44,40 +87,8 @@ pub struct CompactorConfig {
|
|||
#[clap(
|
||||
long = "--compaction-concurrent-size-bytes",
|
||||
env = "INFLUXDB_IOX_COMPACTION_CONCURRENT_SIZE_BYTES",
|
||||
default_value = "1000000000",
|
||||
action
|
||||
)]
|
||||
pub max_concurrent_compaction_size_bytes: i64,
|
||||
|
||||
/// The compactor will compact overlapped files with non-overlapped and contiguous files
|
||||
/// a larger file of max size defined by the config value.
|
||||
/// Default is 100,000,000 (100MB)
|
||||
#[clap(
|
||||
long = "--compaction-max-size-bytes",
|
||||
env = "INFLUXDB_IOX_COMPACTION_MAX_SIZE_BYTES",
|
||||
default_value = "100000000",
|
||||
action
|
||||
)]
|
||||
pub compaction_max_size_bytes: i64,
|
||||
|
||||
/// Limit the number of files per compaction
|
||||
/// Default is 100
|
||||
#[clap(
|
||||
long = "--compaction-max-file-count",
|
||||
env = "INFLUXDB_IOX_COMPACTION_MAX_FILE_COUNT",
|
||||
default_value = "100",
|
||||
action
|
||||
)]
|
||||
pub compaction_max_file_count: i64,
|
||||
|
||||
/// If the compacted result is too large, the compactor will split it into many files each
|
||||
/// estimated to be smaller than this value
|
||||
/// Default is 30,000,000 (30MB)
|
||||
#[clap(
|
||||
long = "--compaction-max-desired-size-bytes",
|
||||
env = "INFLUXDB_IOX_COMPACTION_MAX_DESIRED_FILE_SIZE_BYTES",
|
||||
default_value = "30000000",
|
||||
action
|
||||
)]
|
||||
pub compaction_max_desired_file_size_bytes: i64,
|
||||
pub max_concurrent_compaction_size_bytes: i64,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1157,11 +1157,7 @@ mod tests {
|
|||
|
||||
// ------------------------------------------------
|
||||
// Compact
|
||||
let split_percentage = 90;
|
||||
let max_concurrent_compaction_size_bytes = 100000;
|
||||
let compaction_max_size_bytes = 100000;
|
||||
let compaction_max_file_count = 10;
|
||||
let compaction_max_desired_file_size_bytes = 30000;
|
||||
let config = make_compactor_config();
|
||||
let compactor = Compactor::new(
|
||||
vec![sequencer.sequencer.id],
|
||||
Arc::clone(&catalog.catalog),
|
||||
|
|
@ -1169,13 +1165,7 @@ mod tests {
|
|||
Arc::new(Executor::new(1)),
|
||||
Arc::new(SystemProvider::new()),
|
||||
BackoffConfig::default(),
|
||||
CompactorConfig::new(
|
||||
split_percentage,
|
||||
max_concurrent_compaction_size_bytes,
|
||||
compaction_max_size_bytes,
|
||||
compaction_max_file_count,
|
||||
compaction_max_desired_file_size_bytes,
|
||||
),
|
||||
config,
|
||||
Arc::new(metric::Registry::new()),
|
||||
);
|
||||
|
||||
|
|
@ -1296,11 +1286,7 @@ mod tests {
|
|||
.create_partition("part")
|
||||
.await;
|
||||
let time = Arc::new(SystemProvider::new());
|
||||
let split_percentage = 90;
|
||||
let max_concurrent_compaction_size_bytes = 100000;
|
||||
let compaction_max_size_bytes = 100000;
|
||||
let compaction_max_file_count = 10;
|
||||
let compaction_max_desired_file_size_bytes = 30000;
|
||||
let config = make_compactor_config();
|
||||
let compactor = Compactor::new(
|
||||
vec![sequencer.sequencer.id],
|
||||
Arc::clone(&catalog.catalog),
|
||||
|
|
@ -1308,13 +1294,7 @@ mod tests {
|
|||
Arc::new(Executor::new(1)),
|
||||
Arc::new(SystemProvider::new()),
|
||||
BackoffConfig::default(),
|
||||
CompactorConfig::new(
|
||||
split_percentage,
|
||||
max_concurrent_compaction_size_bytes,
|
||||
compaction_max_size_bytes,
|
||||
compaction_max_file_count,
|
||||
compaction_max_desired_file_size_bytes,
|
||||
),
|
||||
config,
|
||||
Arc::new(metric::Registry::new()),
|
||||
);
|
||||
|
||||
|
|
@ -1329,7 +1309,7 @@ mod tests {
|
|||
1,
|
||||
10,
|
||||
20,
|
||||
compactor.config.compaction_max_size_bytes() + 10,
|
||||
config.compaction_max_desired_file_size_bytes() + 10,
|
||||
20,
|
||||
)
|
||||
.await;
|
||||
|
|
@ -1498,11 +1478,7 @@ mod tests {
|
|||
.await
|
||||
.parquet_file;
|
||||
|
||||
let split_percentage = 90;
|
||||
let max_concurrent_compaction_size_bytes = 100000;
|
||||
let compaction_max_size_bytes = 100000;
|
||||
let compaction_max_file_count = 10;
|
||||
let compaction_max_desired_file_size_bytes = 30000;
|
||||
let config = make_compactor_config();
|
||||
let compactor = Compactor::new(
|
||||
vec![sequencer.sequencer.id],
|
||||
Arc::clone(&catalog.catalog),
|
||||
|
|
@ -1510,13 +1486,7 @@ mod tests {
|
|||
Arc::new(Executor::new(1)),
|
||||
Arc::new(SystemProvider::new()),
|
||||
BackoffConfig::default(),
|
||||
CompactorConfig::new(
|
||||
split_percentage,
|
||||
max_concurrent_compaction_size_bytes,
|
||||
compaction_max_size_bytes,
|
||||
compaction_max_file_count,
|
||||
compaction_max_desired_file_size_bytes,
|
||||
),
|
||||
config,
|
||||
Arc::new(metric::Registry::new()),
|
||||
);
|
||||
|
||||
|
|
@ -1593,11 +1563,7 @@ mod tests {
|
|||
.await
|
||||
.parquet_file;
|
||||
|
||||
let split_percentage = 90;
|
||||
let max_concurrent_compaction_size_bytes = 100000;
|
||||
let compaction_max_size_bytes = 100000;
|
||||
let compaction_max_file_count = 10;
|
||||
let compaction_max_desired_file_size_bytes = 30000;
|
||||
let config = make_compactor_config();
|
||||
let compactor = Compactor::new(
|
||||
vec![sequencer.sequencer.id],
|
||||
Arc::clone(&catalog.catalog),
|
||||
|
|
@ -1605,13 +1571,7 @@ mod tests {
|
|||
Arc::new(Executor::new(1)),
|
||||
Arc::new(SystemProvider::new()),
|
||||
BackoffConfig::default(),
|
||||
CompactorConfig::new(
|
||||
split_percentage,
|
||||
max_concurrent_compaction_size_bytes,
|
||||
compaction_max_size_bytes,
|
||||
compaction_max_file_count,
|
||||
compaction_max_desired_file_size_bytes,
|
||||
),
|
||||
config,
|
||||
Arc::new(metric::Registry::new()),
|
||||
);
|
||||
|
||||
|
|
@ -1724,11 +1684,7 @@ mod tests {
|
|||
.await
|
||||
.parquet_file;
|
||||
|
||||
let split_percentage = 90;
|
||||
let max_concurrent_compaction_size_bytes = 100000;
|
||||
let compaction_max_size_bytes = 100000;
|
||||
let compaction_max_file_count = 10;
|
||||
let compaction_max_desired_file_size_bytes = 30000;
|
||||
let config = make_compactor_config();
|
||||
let compactor = Compactor::new(
|
||||
vec![sequencer.sequencer.id],
|
||||
Arc::clone(&catalog.catalog),
|
||||
|
|
@ -1736,13 +1692,7 @@ mod tests {
|
|||
Arc::new(Executor::new(1)),
|
||||
Arc::new(SystemProvider::new()),
|
||||
BackoffConfig::default(),
|
||||
CompactorConfig::new(
|
||||
split_percentage,
|
||||
max_concurrent_compaction_size_bytes,
|
||||
compaction_max_size_bytes,
|
||||
compaction_max_file_count,
|
||||
compaction_max_desired_file_size_bytes,
|
||||
),
|
||||
config,
|
||||
Arc::new(metric::Registry::new()),
|
||||
);
|
||||
|
||||
|
|
@ -1941,11 +1891,7 @@ mod tests {
|
|||
async fn test_add_parquet_file_with_tombstones() {
|
||||
let catalog = TestCatalog::new();
|
||||
|
||||
let split_percentage = 90;
|
||||
let max_concurrent_compaction_size_bytes = 100000;
|
||||
let compaction_max_size_bytes = 100000;
|
||||
let compaction_max_file_count = 10;
|
||||
let compaction_max_desired_file_size_bytes = 30000;
|
||||
let config = make_compactor_config();
|
||||
let compactor = Compactor::new(
|
||||
vec![],
|
||||
Arc::clone(&catalog.catalog),
|
||||
|
|
@ -1953,13 +1899,7 @@ mod tests {
|
|||
Arc::new(Executor::new(1)),
|
||||
Arc::new(SystemProvider::new()),
|
||||
BackoffConfig::default(),
|
||||
CompactorConfig::new(
|
||||
split_percentage,
|
||||
max_concurrent_compaction_size_bytes,
|
||||
compaction_max_size_bytes,
|
||||
compaction_max_file_count,
|
||||
compaction_max_desired_file_size_bytes,
|
||||
),
|
||||
config,
|
||||
Arc::new(metric::Registry::new()),
|
||||
);
|
||||
|
||||
|
|
@ -2279,11 +2219,7 @@ mod tests {
|
|||
.unwrap();
|
||||
txn.commit().await.unwrap();
|
||||
|
||||
let split_percentage = 90;
|
||||
let max_concurrent_compaction_size_bytes = 100000;
|
||||
let compaction_max_size_bytes = 100000;
|
||||
let compaction_max_file_count = 10;
|
||||
let compaction_max_desired_file_size_bytes = 30000;
|
||||
let config = make_compactor_config();
|
||||
let compactor = Compactor::new(
|
||||
vec![sequencer.id],
|
||||
Arc::clone(&catalog.catalog),
|
||||
|
|
@ -2291,13 +2227,7 @@ mod tests {
|
|||
Arc::new(Executor::new(1)),
|
||||
Arc::new(SystemProvider::new()),
|
||||
BackoffConfig::default(),
|
||||
CompactorConfig::new(
|
||||
split_percentage,
|
||||
max_concurrent_compaction_size_bytes,
|
||||
compaction_max_size_bytes,
|
||||
compaction_max_file_count,
|
||||
compaction_max_desired_file_size_bytes,
|
||||
),
|
||||
config,
|
||||
Arc::new(metric::Registry::new()),
|
||||
);
|
||||
|
||||
|
|
@ -2372,11 +2302,7 @@ mod tests {
|
|||
.await
|
||||
.parquet_file;
|
||||
|
||||
let split_percentage = 90;
|
||||
let max_concurrent_compaction_size_bytes = 100000;
|
||||
let compaction_max_size_bytes = 100000;
|
||||
let compaction_max_file_count = 10;
|
||||
let compaction_max_desired_file_size_bytes = 30000;
|
||||
let config = make_compactor_config();
|
||||
let compactor = Compactor::new(
|
||||
vec![sequencer.sequencer.id],
|
||||
Arc::clone(&catalog.catalog),
|
||||
|
|
@ -2384,13 +2310,7 @@ mod tests {
|
|||
Arc::new(Executor::new(1)),
|
||||
Arc::new(SystemProvider::new()),
|
||||
BackoffConfig::default(),
|
||||
CompactorConfig::new(
|
||||
split_percentage,
|
||||
max_concurrent_compaction_size_bytes,
|
||||
compaction_max_size_bytes,
|
||||
compaction_max_file_count,
|
||||
compaction_max_desired_file_size_bytes,
|
||||
),
|
||||
config,
|
||||
Arc::new(metric::Registry::new()),
|
||||
);
|
||||
|
||||
|
|
@ -2452,4 +2372,19 @@ mod tests {
|
|||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn make_compactor_config() -> CompactorConfig {
|
||||
let compaction_max_number_level_0_files = 3;
|
||||
let compaction_max_desired_file_size_bytes = 10_000;
|
||||
let compaction_percentage_max_file_size = 30;
|
||||
let compaction_split_percentage = 80;
|
||||
let max_concurrent_compaction_size_bytes = 100_000;
|
||||
CompactorConfig::new(
|
||||
compaction_max_number_level_0_files,
|
||||
compaction_max_desired_file_size_bytes,
|
||||
compaction_percentage_max_file_size,
|
||||
compaction_split_percentage,
|
||||
max_concurrent_compaction_size_bytes,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -99,49 +99,74 @@ impl CompactorHandlerImpl {
|
|||
/// The configuration options for the compactor.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct CompactorConfig {
|
||||
/// Percentage of least recent data we want to split to reduce compacting non-overlapped data
|
||||
/// Must be between 0 and 100.
|
||||
split_percentage: i64,
|
||||
/// Max number of level-0 files (written by ingester) we want to compact with level-1 each time
|
||||
compaction_max_number_level_0_files: i32,
|
||||
|
||||
/// Desired max size of compacted parquet files
|
||||
/// It is a target desired value than a guarantee
|
||||
compaction_max_desired_file_size_bytes: i64,
|
||||
|
||||
/// Percentage of desired max file size.
|
||||
/// If the estimated compacted result is too small, no need to split it.
|
||||
/// This percentage is to determine how small it is:
|
||||
/// < compaction_percentage_max_file_size * compaction_max_desired_file_size_bytes:
|
||||
/// This value must be between (0, 100)
|
||||
compaction_percentage_max_file_size: i16,
|
||||
|
||||
/// Split file percentage
|
||||
/// If the estimated compacted result is neither too small nor too large, it will be split
|
||||
/// into 2 files determined by this percentage.
|
||||
/// . Too small means: < compaction_percentage_max_file_size * compaction_max_desired_file_size_bytes
|
||||
/// . Too large means: > compaction_max_desired_file_size_bytes
|
||||
/// . Any size in the middle will be considered neither too small nor too large
|
||||
/// This value must be between (0, 100)
|
||||
compaction_split_percentage: i16,
|
||||
|
||||
/// The compactor will limit the number of simultaneous compaction jobs based on the
|
||||
/// size of the input files to be compacted. Currently this only takes into account the
|
||||
/// level 0 and 1 files, but should later also consider the level 2 files to be compacted. This
|
||||
/// number should be less than 1/10th of the available memory to ensure compactions have
|
||||
/// size of the input files to be compacted. This number should be less than 1/10th
|
||||
/// of the available memory to ensure compactions have
|
||||
/// enough space to run.
|
||||
max_concurrent_compaction_size_bytes: i64,
|
||||
/// The compactor will compact overlapped files but if the total size of the files
|
||||
/// exceeds this value, the content of each files will be split in the current compaction cycle
|
||||
/// so in the next cycle, they will be small enough to get compacted
|
||||
compaction_max_size_bytes: i64,
|
||||
/// Limit the number of files to compact into one file
|
||||
compaction_max_file_count: i64, // TODO: remove this value
|
||||
/// If the compacted result is larger than this value, it will be persisted into
|
||||
/// many files, each is estimated smaller than this value
|
||||
compaction_max_desired_file_size_bytes: i64,
|
||||
}
|
||||
|
||||
impl CompactorConfig {
|
||||
/// Initialize a valid config
|
||||
pub fn new(
|
||||
split_percentage: i64,
|
||||
max_concurrent_compaction_size_bytes: i64,
|
||||
compaction_max_size_bytes: i64,
|
||||
compaction_max_file_count: i64,
|
||||
compaction_max_number_level_0_files: i32,
|
||||
compaction_max_desired_file_size_bytes: i64,
|
||||
compaction_percentage_max_file_size: i16,
|
||||
compaction_split_percentage: i16,
|
||||
max_concurrent_compaction_size_bytes: i64,
|
||||
) -> Self {
|
||||
assert!(split_percentage > 0 && split_percentage <= 100);
|
||||
assert!(compaction_split_percentage > 0 && compaction_split_percentage <= 100);
|
||||
|
||||
Self {
|
||||
split_percentage,
|
||||
max_concurrent_compaction_size_bytes,
|
||||
compaction_max_size_bytes,
|
||||
compaction_max_file_count,
|
||||
compaction_max_number_level_0_files,
|
||||
compaction_max_desired_file_size_bytes,
|
||||
compaction_percentage_max_file_size,
|
||||
compaction_split_percentage,
|
||||
max_concurrent_compaction_size_bytes,
|
||||
}
|
||||
}
|
||||
|
||||
/// Max number of level-0 files we want to compact with level-1 each time
|
||||
pub fn compaction_max_number_level_0_files(&self) -> i32 {
|
||||
self.compaction_max_number_level_0_files
|
||||
}
|
||||
|
||||
/// Desired max file of a compacted file
|
||||
pub fn compaction_max_desired_file_size_bytes(&self) -> i64 {
|
||||
self.compaction_max_desired_file_size_bytes
|
||||
}
|
||||
|
||||
/// Percentage of desired max file size to determine a size is too small
|
||||
pub fn compaction_percentage_max_file_size(&self) -> i16 {
|
||||
self.compaction_percentage_max_file_size
|
||||
}
|
||||
|
||||
/// Percentage of least recent data we want to split to reduce compacting non-overlapped data
|
||||
pub fn split_percentage(&self) -> i64 {
|
||||
self.split_percentage
|
||||
pub fn compaction_split_percentage(&self) -> i16 {
|
||||
self.compaction_split_percentage
|
||||
}
|
||||
|
||||
/// The compactor will limit the number of simultaneous compaction jobs based on the
|
||||
|
|
@ -152,23 +177,6 @@ impl CompactorConfig {
|
|||
pub fn max_concurrent_compaction_size_bytes(&self) -> i64 {
|
||||
self.max_concurrent_compaction_size_bytes
|
||||
}
|
||||
|
||||
/// The compactor will compact overlapped files no matter how much large they are.
|
||||
/// For non-overlapped and contiguous files, compactor will also compact them into
|
||||
/// a larger file of max size defined by this config value.
|
||||
pub fn compaction_max_size_bytes(&self) -> i64 {
|
||||
self.compaction_max_size_bytes
|
||||
}
|
||||
|
||||
/// Max number of files to compact at a time
|
||||
pub fn compaction_max_file_count(&self) -> i64 {
|
||||
self.compaction_max_file_count
|
||||
}
|
||||
|
||||
/// max desired persisted file size by the compactor (estimate)
|
||||
pub fn compaction_max_desired_file_size_bytes(&self) -> i64 {
|
||||
self.compaction_max_desired_file_size_bytes
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks for candidate partitions to compact and spawns tokio tasks to compact as many
|
||||
|
|
|
|||
|
|
@ -392,11 +392,11 @@ impl Config {
|
|||
topic: QUERY_POOL_NAME.to_string(),
|
||||
write_buffer_partition_range_start,
|
||||
write_buffer_partition_range_end,
|
||||
split_percentage: 90,
|
||||
max_concurrent_compaction_size_bytes: 100000,
|
||||
compaction_max_size_bytes: 100000,
|
||||
compaction_max_file_count: 10,
|
||||
compaction_max_number_level_0_files: 3,
|
||||
compaction_max_desired_file_size_bytes: 30000,
|
||||
compaction_percentage_max_file_size: 30,
|
||||
compaction_split_percentage: 80,
|
||||
max_concurrent_compaction_size_bytes: 100000,
|
||||
};
|
||||
|
||||
let querier_config = QuerierConfig {
|
||||
|
|
|
|||
|
|
@ -164,11 +164,11 @@ pub async fn create_compactor_server_type(
|
|||
let parquet_store = ParquetStorage::new(object_store);
|
||||
|
||||
let compactor_config = compactor::handler::CompactorConfig::new(
|
||||
compactor_config.split_percentage,
|
||||
compactor_config.max_concurrent_compaction_size_bytes,
|
||||
compactor_config.compaction_max_size_bytes,
|
||||
compactor_config.compaction_max_file_count,
|
||||
compactor_config.compaction_max_number_level_0_files,
|
||||
compactor_config.compaction_max_desired_file_size_bytes,
|
||||
compactor_config.compaction_percentage_max_file_size,
|
||||
compactor_config.compaction_split_percentage,
|
||||
compactor_config.max_concurrent_compaction_size_bytes,
|
||||
);
|
||||
let compactor_handler = Arc::new(CompactorHandlerImpl::new(
|
||||
sequencers,
|
||||
|
|
|
|||
Loading…
Reference in New Issue