From 2406cdb24b78f8265a87dabf93075708dbd90a89 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 14 Dec 2022 15:08:26 -0500 Subject: [PATCH] feat: Create a compactor2 cli --- clap_blocks/src/compactor2.rs | 232 ++++++++++++++++++ clap_blocks/src/lib.rs | 1 + influxdb_iox/src/commands/run/compactor.rs | 7 + influxdb_iox/src/commands/run/compactor2.rs | 145 +++++++++++ influxdb_iox/src/commands/run/mod.rs | 11 + .../tests/end_to_end_cases/mode_switching.rs | 35 +++ ioxd_compactor/src/lib.rs | 135 ++++++++++ 7 files changed, 566 insertions(+) create mode 100644 clap_blocks/src/compactor2.rs create mode 100644 influxdb_iox/src/commands/run/compactor2.rs diff --git a/clap_blocks/src/compactor2.rs b/clap_blocks/src/compactor2.rs new file mode 100644 index 0000000000..fa7b37c1c4 --- /dev/null +++ b/clap_blocks/src/compactor2.rs @@ -0,0 +1,232 @@ +//! CLI config for compactor2-related commands + +// This is mostly the same as clap_blocks/src/compactor.rs except compactor2 doesn't have +// `--write-buffer-topic`, `--shard-index-range-start`, or `--shard-index-range-end`. +// Also, there isn't a `compactor2 run-once` command yet. + +#![cfg_attr(rustfmt, rustfmt_skip)] // https://github.com/rust-lang/rustfmt/issues/5489 + +/// Create compactor configuration that can have different defaults. The `run compactor` +/// server/service needs different defaults than the `compactor run-once` command, and this macro +/// enables sharing of the parts of the configs that are the same without duplicating the code. +macro_rules! gen_compactor_config { + ( + $name:ident, + // hot_multiple is currently the only flag that has a differing default. Add more macro + // arguments similar to this one if more flags need different defaults. + hot_multiple_default = $hot_multiple_default:literal + $(,)? + ) => { + /// CLI config for compactor + #[derive(Debug, Clone, Copy, clap::Parser)] + pub struct $name { + /// Desired max size of compacted parquet files. + /// It is a target desired value, rather than a guarantee. + /// 1024 * 1024 * 25 = 26,214,400 (25MB) + #[clap( + long = "compaction-max-desired-size-bytes", + env = "INFLUXDB_IOX_COMPACTION_MAX_DESIRED_FILE_SIZE_BYTES", + default_value = "26214400", + action + )] + pub max_desired_file_size_bytes: u64, + + /// Percentage of desired max file size. + /// If the estimated compacted result is too small, no need to split it. + /// This percentage is to determine how small it is: + /// < percentage_max_file_size * max_desired_file_size_bytes: + /// This value must be between (0, 100) + /// Default is 80 + #[clap( + long = "compaction-percentage-max-file_size", + env = "INFLUXDB_IOX_COMPACTION_PERCENTAGE_MAX_FILE_SIZE", + default_value = "80", + action + )] + pub percentage_max_file_size: u16, + + /// Split file percentage + /// If the estimated compacted result is neither too small nor too large, it will be + /// split into 2 files determined by this percentage. + /// . Too small means: < percentage_max_file_size * max_desired_file_size_bytes + /// . Too large means: > max_desired_file_size_bytes + /// . Any size in the middle will be considered neither too small nor too large + /// + /// This value must be between (0, 100) + /// Default is 80 + #[clap( + long = "compaction-split-percentage", + env = "INFLUXDB_IOX_COMPACTION_SPLIT_PERCENTAGE", + default_value = "80", + action + )] + pub split_percentage: u16, + + /// Max number of partitions per shard we want to compact per cycle + /// Default: 1 + #[clap( + long = "compaction-max-number-partitions-per-shard", + env = "INFLUXDB_IOX_COMPACTION_MAX_NUMBER_PARTITIONS_PER_SHARD", + default_value = "1", + action + )] + pub max_number_partitions_per_shard: usize, + + /// Min number of recent ingested files a partition needs to be considered for + /// compacting + /// + /// Default: 1 + #[clap( + long = "compaction-min-number-recent-ingested-files-per-partition", + env = "INFLUXDB_IOX_COMPACTION_MIN_NUMBER_RECENT_INGESTED_FILES_PER_PARTITION", + default_value = "1", + action + )] + pub min_number_recent_ingested_files_per_partition: usize, + + /// The multiple of times that compacting hot partitions should run for every one time + /// that compacting cold partitions runs. Set to 1 to compact hot partitions and cold + /// partitions equally. + /// + /// Default is + #[doc = $hot_multiple_default] + #[clap( + long = "compaction-hot-multiple", + env = "INFLUXDB_IOX_COMPACTION_HOT_MULTIPLE", + default_value = $hot_multiple_default, + action + )] + pub hot_multiple: usize, + + /// The memory budget assigned to this compactor. + /// + /// For each partition candidate, we will estimate the memory needed to compact each + /// file and only add more files if their needed estimated memory is below this memory + /// budget. Since we must compact L1 files that overlapped with L0 files, if their + /// total estimated memory does not allow us to compact a part of a partition at all, + /// we will not compact it and will log the partition and its related information in a + /// table in our catalog for further diagnosis of the issue. + /// + /// The number of candidates compacted concurrently is also decided using this + /// estimation and budget. + /// + /// Default is 30 * 1024 * 1024 * 1024 = 32,212,254,720 bytes (30GB). + #[clap( + long = "compaction-memory-budget-bytes", + env = "INFLUXDB_IOX_COMPACTION_MEMORY_BUDGET_BYTES", + default_value = "32212254720", + action + )] + pub memory_budget_bytes: u64, + + /// Minimum number of rows allocated for each record batch fed into DataFusion plan + /// + /// We will use max(parquet_file's row_count, min_num_rows_allocated_per_record_batch_to_datafusion_plan) + /// to estimate number of rows allocated for each record batch fed into DataFusion plan. + /// + #[clap( + long = "compaction-min-rows-allocated-per-record-batch-to-plan", + env = "INFLUXDB_IOX_COMPACTION_MIN_ROWS_PER_RECORD_BATCH_TO_PLAN", + default_value = "8192", + action + )] + pub min_num_rows_allocated_per_record_batch_to_datafusion_plan: u64, + + /// Max number of files to compact per partition + /// + /// Due to limitations of the implementation of the compaction plans + /// there is a hard maximum on the number of files that can be compacted + /// at once. This avoids a wide fan-in multi-way merge in the DataFusion plan + #[clap( + long = "compaction-max-num-compacting-files", + env = "INFLUXDB_IOX_COMPACTION_MAX_COMPACTING_FILES", + default_value = "20", + action + )] + pub max_num_compacting_files: usize, + + /// Max number of files to compact for a partition in which the first file and its + /// overlaps push the file count limit over `max_num_compacting_files`. + /// It's a special case of `max_num_compacting_files` that's higher just for the first + /// file in a partition + #[clap( + long = "compaction-max-num-compacting-files-first-in-partition", + env = "INFLUXDB_IOX_COMPACTION_MAX_COMPACTING_FILES_FIRST_IN_PARTITION", + default_value = "40", + action + )] + pub max_num_compacting_files_first_in_partition: usize, + + /// Number of minutes without a write to a partition before it is considered cold + /// and thus a candidate for compaction + #[clap( + long = "compaction-minutes-without-new-writes-to-be-cold", + env = "INFLUXDB_IOX_COMPACTION_MINUTES_WITHOUT_NEW_WRITE_TO_BE_COLD", + default_value = "480", + action + )] + pub minutes_without_new_writes_to_be_cold: u64, + + /// When querying for partitions with data for hot compaction, how many hours to look + /// back for a first pass. + #[clap( + long = "compaction-hot-partition-hours-threshold-1", + env = "INFLUXDB_IOX_COMPACTION_HOT_PARTITION_HOURS_THRESHOLD_1", + default_value = "4", + action + )] + pub hot_compaction_hours_threshold_1: u64, + + /// When querying for partitions with data for hot compaction, how many hours to look + /// back for a second pass if we found nothing in the first pass. + #[clap( + long = "compaction-hot-partition-hours-threshold-2", + env = "INFLUXDB_IOX_COMPACTION_HOT_PARTITION_HOURS_THRESHOLD_2", + default_value = "24", + action + )] + pub hot_compaction_hours_threshold_2: u64, + + /// Max number of partitions that can be compacted in parallel at once + /// We use memory budget to estimate how many partitions can be compacted in parallel at once. + /// However, we do not want to have that number too large which will cause the high usage of CPU cores + /// and may also lead to inaccuracy of memory estimation. This number is to cap that. + #[clap( + long = "compaction-max-parallel-partitions", + env = "INFLUXDB_IOX_COMPACTION_MAX_PARALLEL_PARTITIONS", + default_value = "20", + action + )] + pub max_parallel_partitions: u64, + } + }; +} + +gen_compactor_config!(Compactor2Config, hot_multiple_default = "4"); + +gen_compactor_config!(Compactor2OnceConfig, hot_multiple_default = "1"); + +impl Compactor2OnceConfig { + /// Convert the configuration for `compactor run-once` into the configuration for `run + /// compactor` so that run-once can reuse some of the code that the compactor server uses. + pub fn into_compactor_config(self) -> Compactor2Config { + Compactor2Config { + max_desired_file_size_bytes: self.max_desired_file_size_bytes, + percentage_max_file_size: self.percentage_max_file_size, + split_percentage: self.split_percentage, + max_number_partitions_per_shard: self.max_number_partitions_per_shard, + min_number_recent_ingested_files_per_partition: self + .min_number_recent_ingested_files_per_partition, + hot_multiple: self.hot_multiple, + memory_budget_bytes: self.memory_budget_bytes, + min_num_rows_allocated_per_record_batch_to_datafusion_plan: self + .min_num_rows_allocated_per_record_batch_to_datafusion_plan, + max_num_compacting_files: self.max_num_compacting_files, + max_num_compacting_files_first_in_partition: self.max_num_compacting_files_first_in_partition, + minutes_without_new_writes_to_be_cold: self.minutes_without_new_writes_to_be_cold, + hot_compaction_hours_threshold_1: self.hot_compaction_hours_threshold_1, + hot_compaction_hours_threshold_2: self.hot_compaction_hours_threshold_2, + max_parallel_partitions: self.max_parallel_partitions, + } + } +} diff --git a/clap_blocks/src/lib.rs b/clap_blocks/src/lib.rs index 04615e98e3..0d166eb0e0 100644 --- a/clap_blocks/src/lib.rs +++ b/clap_blocks/src/lib.rs @@ -14,6 +14,7 @@ )] pub mod catalog_dsn; pub mod compactor; +pub mod compactor2; pub mod ingester; pub mod ingester2; pub mod object_store; diff --git a/influxdb_iox/src/commands/run/compactor.rs b/influxdb_iox/src/commands/run/compactor.rs index 6313f19d6c..d0ad94cb77 100644 --- a/influxdb_iox/src/commands/run/compactor.rs +++ b/influxdb_iox/src/commands/run/compactor.rs @@ -88,6 +88,13 @@ pub struct Config { } pub async fn command(config: Config) -> Result<(), Error> { + if std::env::var("INFLUXDB_IOX_RPC_MODE").is_ok() { + panic!( + "`INFLUXDB_IOX_RPC_MODE` was specified but `compactor` was the command run. Either unset + `INFLUXDB_IOX_RPC_MODE` or run the `compactor2` command." + ); + } + let common_state = CommonServerState::from_config(config.run_config.clone())?; let time_provider = Arc::new(SystemProvider::new()) as Arc; diff --git a/influxdb_iox/src/commands/run/compactor2.rs b/influxdb_iox/src/commands/run/compactor2.rs new file mode 100644 index 0000000000..a31be39df3 --- /dev/null +++ b/influxdb_iox/src/commands/run/compactor2.rs @@ -0,0 +1,145 @@ +//! Command line options for running compactor2 in RPC write mode + +use iox_query::exec::{Executor, ExecutorConfig}; +use iox_time::{SystemProvider, TimeProvider}; +use object_store::DynObjectStore; +use object_store_metrics::ObjectStoreMetrics; +use observability_deps::tracing::*; +use parquet_file::storage::{ParquetStorage, StorageId}; +use std::collections::HashMap; +use std::sync::Arc; +use thiserror::Error; + +use clap_blocks::object_store::make_object_store; +use clap_blocks::{ + catalog_dsn::CatalogDsnConfig, compactor2::Compactor2Config, run_config::RunConfig, +}; +use ioxd_common::server_type::{CommonServerState, CommonServerStateError}; +use ioxd_common::Service; +use ioxd_compactor::create_compactor2_server_type; + +use crate::process_info::{setup_metric_registry, USIZE_MAX}; + +use super::main; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Run: {0}")] + Run(#[from] main::Error), + + #[error("Invalid config: {0}")] + InvalidConfig(#[from] CommonServerStateError), + + #[error("Catalog error: {0}")] + Catalog(#[from] iox_catalog::interface::Error), + + #[error("Catalog DSN error: {0}")] + CatalogDsn(#[from] clap_blocks::catalog_dsn::Error), + + #[error("Cannot parse object store config: {0}")] + ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError), + + #[error("error initializing compactor: {0}")] + Compactor(#[from] ioxd_compactor::Error), +} + +#[derive(Debug, clap::Parser)] +#[clap( + name = "run", + about = "Runs in compactor mode using the RPC write path", + long_about = "Run the IOx compactor server.\n\nThe configuration options below can be \ + set either with the command line flags or with the specified environment \ + variable. If there is a file named '.env' in the current working directory, \ + it is sourced before loading the configuration. + +Configuration is loaded from the following sources (highest precedence first): + - command line arguments + - user set environment variables + - .env file contents + - pre-configured default values" +)] +pub struct Config { + #[clap(flatten)] + pub(crate) run_config: RunConfig, + + #[clap(flatten)] + pub(crate) catalog_dsn: CatalogDsnConfig, + + #[clap(flatten)] + pub(crate) compactor_config: Compactor2Config, + + /// Number of threads to use for the compactor query execution, compaction and persistence. + #[clap( + long = "query-exec-thread-count", + env = "INFLUXDB_IOX_QUERY_EXEC_THREAD_COUNT", + default_value = "4", + action + )] + pub query_exec_thread_count: usize, + + /// Size of memory pool used during query exec, in bytes. + #[clap( + long = "exec-mem-pool-bytes", + env = "INFLUXDB_IOX_EXEC_MEM_POOL_BYTES", + default_value = &USIZE_MAX[..], + action + )] + pub exec_mem_pool_bytes: usize, +} + +pub async fn command(config: Config) -> Result<(), Error> { + if std::env::var("INFLUXDB_IOX_RPC_MODE").is_err() { + panic!( + "`INFLUXDB_IOX_RPC_MODE` was not specified but `compactor2` was the command run. Either set + `INFLUXDB_IOX_RPC_MODE` or run the `compactor` command." + ); + } + + let common_state = CommonServerState::from_config(config.run_config.clone())?; + + let time_provider = Arc::new(SystemProvider::new()) as Arc; + let metric_registry = setup_metric_registry(); + let catalog = config + .catalog_dsn + .get_catalog("compactor", Arc::clone(&metric_registry)) + .await?; + + let object_store = make_object_store(config.run_config.object_store_config()) + .map_err(Error::ObjectStoreParsing)?; + + // Decorate the object store with a metric recorder. + let object_store: Arc = Arc::new(ObjectStoreMetrics::new( + object_store, + Arc::clone(&time_provider), + &metric_registry, + )); + + let parquet_store = ParquetStorage::new(object_store, StorageId::from("iox")); + + let exec = Arc::new(Executor::new_with_config(ExecutorConfig { + num_threads: config.query_exec_thread_count, + target_query_partitions: config.query_exec_thread_count, + object_stores: HashMap::from([( + parquet_store.id(), + Arc::clone(parquet_store.object_store()), + )]), + mem_pool_size: config.exec_mem_pool_bytes, + })); + let time_provider = Arc::new(SystemProvider::new()); + + let server_type = create_compactor2_server_type( + &common_state, + Arc::clone(&metric_registry), + catalog, + parquet_store, + exec, + time_provider, + config.compactor_config, + ) + .await?; + + info!("starting compactor"); + + let services = vec![Service::create(server_type, common_state.run_config())]; + Ok(main::main(common_state, services, metric_registry).await?) +} diff --git a/influxdb_iox/src/commands/run/mod.rs b/influxdb_iox/src/commands/run/mod.rs index 7c9fa42fdb..f1b5d34187 100644 --- a/influxdb_iox/src/commands/run/mod.rs +++ b/influxdb_iox/src/commands/run/mod.rs @@ -3,6 +3,7 @@ use trogging::cli::LoggingConfig; pub(crate) mod all_in_one; mod compactor; +mod compactor2; mod garbage_collector; mod ingester; mod ingester2; @@ -18,6 +19,9 @@ pub enum Error { #[snafu(display("Error in compactor subcommand: {}", source))] CompactorError { source: compactor::Error }, + #[snafu(display("Error in compactor2 subcommand: {}", source))] + Compactor2Error { source: compactor2::Error }, + #[snafu(display("Error in garbage collector subcommand: {}", source))] GarbageCollectorError { source: garbage_collector::Error }, @@ -60,6 +64,7 @@ impl Config { match &self.command { None => &self.all_in_one_config.logging_config, Some(Command::Compactor(config)) => config.run_config.logging_config(), + Some(Command::Compactor2(config)) => config.run_config.logging_config(), Some(Command::GarbageCollector(config)) => config.run_config.logging_config(), Some(Command::Querier(config)) => config.run_config.logging_config(), Some(Command::Router(config)) => config.run_config.logging_config(), @@ -77,6 +82,9 @@ enum Command { /// Run the server in compactor mode Compactor(compactor::Config), + /// Run the server in compactor2 mode + Compactor2(compactor2::Config), + /// Run the server in querier mode Querier(querier::Config), @@ -110,6 +118,9 @@ pub async fn command(config: Config) -> Result<()> { Some(Command::Compactor(config)) => { compactor::command(config).await.context(CompactorSnafu) } + Some(Command::Compactor2(config)) => { + compactor2::command(config).await.context(Compactor2Snafu) + } Some(Command::GarbageCollector(config)) => garbage_collector::command(config) .await .context(GarbageCollectorSnafu), diff --git a/influxdb_iox/tests/end_to_end_cases/mode_switching.rs b/influxdb_iox/tests/end_to_end_cases/mode_switching.rs index a541852a19..86d8c1e113 100644 --- a/influxdb_iox/tests/end_to_end_cases/mode_switching.rs +++ b/influxdb_iox/tests/end_to_end_cases/mode_switching.rs @@ -172,3 +172,38 @@ fn querier_without_ingesters_with_mode_env_var_uses_rpc_write() { .failure() .stdout(predicate::str::contains("using the RPC write path")); } + +#[test] +fn compactor_errors_with_mode_env_var() { + Command::cargo_bin("influxdb_iox") + .unwrap() + .env_clear() + .env("INFLUXDB_IOX_RPC_MODE", "2") + .arg("run") + .arg("compactor") + .arg("--shard-index-range-start") + .arg("0") + .arg("--shard-index-range-end") + .arg("1") + .timeout(Duration::from_secs(2)) + .assert() + .failure() + .stderr(predicate::str::contains( + "`INFLUXDB_IOX_RPC_MODE` was specified but `compactor` was the command run", + )); +} + +#[test] +fn compactor2_errors_without_mode_env_var() { + Command::cargo_bin("influxdb_iox") + .unwrap() + .env_clear() + .arg("run") + .arg("compactor2") + .timeout(Duration::from_secs(2)) + .assert() + .failure() + .stderr(predicate::str::contains( + "`INFLUXDB_IOX_RPC_MODE` was not specified but `compactor2` was the command run", + )); +} \ No newline at end of file diff --git a/ioxd_compactor/src/lib.rs b/ioxd_compactor/src/lib.rs index 0b91909068..e3d90bec0a 100644 --- a/ioxd_compactor/src/lib.rs +++ b/ioxd_compactor/src/lib.rs @@ -132,7 +132,123 @@ impl HttpApiErrorSource for IoxHttpError { } } +/// Instantiate a compactor server that uses the RPC write path +// NOTE!!! This needs to be kept in sync with `create_compactor_server_type` until the switch to +// the RPC write path mode is complete! See the annotations about where these two functions line up +// and where they diverge. +pub async fn create_compactor2_server_type( + common_state: &CommonServerState, + metric_registry: Arc, + catalog: Arc, + parquet_store: ParquetStorage, + exec: Arc, + time_provider: Arc, + // Parameter difference: this function takes a `Compactor2Config` that doesn't have a write + // buffer topic or shard indexes. All the other parameters here are the same. + compactor_config: Compactor2Config, +) -> Result> { + let grpc_catalog = Arc::clone(&catalog); + + // Setup difference: build a compactor2 instead, which expects a `Compactor2Config`. + let compactor = build_compactor2_from_config( + compactor_config, + catalog, + parquet_store, + exec, + time_provider, + Arc::clone(&metric_registry), + ) + .await?; + + let compactor_handler = Arc::new(CompactorHandlerImpl::new(Arc::new(compactor))); + + let grpc = GrpcDelegate::new(grpc_catalog, Arc::clone(&compactor_handler)); + + let compactor = CompactorServer::new(metric_registry, grpc, compactor_handler); + Ok(Arc::new(CompactorServerType::new(compactor, common_state))) +} + +// NOTE!!! This needs to be kept in sync with `build_compactor_from_config` until the switch to +// the RPC write path mode is complete! See the annotations about where these two functions line up +// and where they diverge. +pub async fn build_compactor2_from_config( + // Parameter difference: this function takes a `Compactor2Config` that doesn't have a write + // buffer topic or shard indexes. All the other parameters here are the same. + compactor_config: Compactor2Config, + catalog: Arc, + parquet_store: ParquetStorage, + exec: Arc, + time_provider: Arc, + metric_registry: Arc, +) -> Result { + // 1. Shard index range checking: MISSING + // This function doesn't check the shard index range here like `build_compactor_from_config` + // does because shard indexes aren't relevant to `compactor2`. + + // 2. Split percentage value range checking + if compactor_config.split_percentage < 1 || compactor_config.split_percentage > 100 { + return Err(Error::SplitPercentageRange { + split_percentage: compactor_config.split_percentage, + }); + } + + // 3. Ensure topic and shard indexes are in the catalog: MISSING + // This isn't relevant to `compactor2`. + + // 4. Convert config type to handler config type + let Compactor2Config { + max_desired_file_size_bytes, + percentage_max_file_size, + split_percentage, + max_number_partitions_per_shard, + min_number_recent_ingested_files_per_partition, + hot_multiple, + memory_budget_bytes, + min_num_rows_allocated_per_record_batch_to_datafusion_plan, + max_num_compacting_files, + max_num_compacting_files_first_in_partition, + minutes_without_new_writes_to_be_cold, + hot_compaction_hours_threshold_1, + hot_compaction_hours_threshold_2, + max_parallel_partitions, + } = compactor_config; + + let compactor_config = compactor::handler::CompactorConfig { + max_desired_file_size_bytes, + percentage_max_file_size, + split_percentage, + max_number_partitions_per_shard, + min_number_recent_ingested_files_per_partition, + hot_multiple, + memory_budget_bytes, + min_num_rows_allocated_per_record_batch_to_datafusion_plan, + max_num_compacting_files, + max_num_compacting_files_first_in_partition, + minutes_without_new_writes_to_be_cold, + hot_compaction_hours_threshold_1, + hot_compaction_hours_threshold_2, + max_parallel_partitions, + }; + // 4. END + + // 5. Create a new compactor: `compactor2` is assigned all shards (this argument can go away + // completely when the switch to RPC mode is complete) + Ok(compactor::compact::Compactor::new( + compactor::compact::ShardAssignment::All, + catalog, + parquet_store, + exec, + time_provider, + backoff::BackoffConfig::default(), + compactor_config, + metric_registry, + )) +} + /// Instantiate a compactor server +// NOTE!!! This needs to be kept in sync with `create_compactor2_server_type` until the switch to +// the RPC write path mode is complete! See the annotations about where these two functions line up +// and where they diverge. pub async fn create_compactor_server_type( common_state: &CommonServerState, metric_registry: Arc, @@ -140,9 +256,13 @@ pub async fn create_compactor_server_type( parquet_store: ParquetStorage, exec: Arc, time_provider: Arc, + // Parameter difference: this function takes a `CompactorConfig` that has a write buffer topic + // and requires shard indexes. All the other parameters here are the same. compactor_config: CompactorConfig, ) -> Result> { let grpc_catalog = Arc::clone(&catalog); + + // Setup difference: build a compactor instead, which expects a `CompactorConfig`. let compactor = build_compactor_from_config( compactor_config, catalog, @@ -161,7 +281,12 @@ pub async fn create_compactor_server_type( Ok(Arc::new(CompactorServerType::new(compactor, common_state))) } +// NOTE!!! This needs to be kept in sync with `build_compactor2_from_config` until the switch to +// the RPC write path mode is complete! See the annotations about where these two functions line up +// and where they diverge. pub async fn build_compactor_from_config( + // Parameter difference: this function takes a `CompactorConfig` that has a write buffer topic + // and requires shard indexes. All the other parameters here are the same. compactor_config: CompactorConfig, catalog: Arc, parquet_store: ParquetStorage, @@ -169,16 +294,22 @@ pub async fn build_compactor_from_config( time_provider: Arc, metric_registry: Arc, ) -> Result { + // 1. Shard index range checking + // This function checks the shard index range; `compactor2` doesn't have shard indexes so + // `build_compactor2_from_config` doesn't have this check. if compactor_config.shard_index_range_start > compactor_config.shard_index_range_end { return Err(Error::ShardIndexRange); } + // 2. Split percentage value range checking if compactor_config.split_percentage < 1 || compactor_config.split_percentage > 100 { return Err(Error::SplitPercentageRange { split_percentage: compactor_config.split_percentage, }); } + // 3. Ensure topic and shard indexes are in the catalog + // This isn't relevant to `compactor2`. let mut txn = catalog.start_transaction().await?; let topic = txn .topics() @@ -199,7 +330,9 @@ pub async fn build_compactor_from_config( shards.push(s.id); } txn.commit().await?; + // 3. END + // 4. Convert config type to handler config type let CompactorConfig { max_desired_file_size_bytes, percentage_max_file_size, @@ -240,7 +373,9 @@ pub async fn build_compactor_from_config( warm_compaction_small_size_threshold_bytes, warm_compaction_min_small_file_count, }; + // 4. END + // 5. Create a new compactor: Assigned only those shards specified in the CLI args. Ok(compactor::compact::Compactor::new( shards, catalog,