feat: Create a compactor2 cli

pull/24376/head
Carol (Nichols || Goulding) 2022-12-14 15:08:26 -05:00
parent c4bcfff80f
commit 2406cdb24b
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
7 changed files with 566 additions and 0 deletions

View File

@ -0,0 +1,232 @@
//! CLI config for compactor2-related commands
// This is mostly the same as clap_blocks/src/compactor.rs except compactor2 doesn't have
// `--write-buffer-topic`, `--shard-index-range-start`, or `--shard-index-range-end`.
// Also, there isn't a `compactor2 run-once` command yet.
#![cfg_attr(rustfmt, rustfmt_skip)] // https://github.com/rust-lang/rustfmt/issues/5489
/// Create compactor configuration that can have different defaults. The `run compactor`
/// server/service needs different defaults than the `compactor run-once` command, and this macro
/// enables sharing of the parts of the configs that are the same without duplicating the code.
macro_rules! gen_compactor_config {
(
$name:ident,
// hot_multiple is currently the only flag that has a differing default. Add more macro
// arguments similar to this one if more flags need different defaults.
hot_multiple_default = $hot_multiple_default:literal
$(,)?
) => {
/// CLI config for compactor
#[derive(Debug, Clone, Copy, clap::Parser)]
pub struct $name {
/// Desired max size of compacted parquet files.
/// It is a target desired value, rather than a guarantee.
/// 1024 * 1024 * 25 = 26,214,400 (25MB)
#[clap(
long = "compaction-max-desired-size-bytes",
env = "INFLUXDB_IOX_COMPACTION_MAX_DESIRED_FILE_SIZE_BYTES",
default_value = "26214400",
action
)]
pub max_desired_file_size_bytes: u64,
/// Percentage of desired max file size.
/// If the estimated compacted result is too small, no need to split it.
/// This percentage is to determine how small it is:
/// < percentage_max_file_size * max_desired_file_size_bytes:
/// This value must be between (0, 100)
/// Default is 80
#[clap(
long = "compaction-percentage-max-file_size",
env = "INFLUXDB_IOX_COMPACTION_PERCENTAGE_MAX_FILE_SIZE",
default_value = "80",
action
)]
pub percentage_max_file_size: u16,
/// Split file percentage
/// If the estimated compacted result is neither too small nor too large, it will be
/// split into 2 files determined by this percentage.
/// . Too small means: < percentage_max_file_size * max_desired_file_size_bytes
/// . Too large means: > max_desired_file_size_bytes
/// . Any size in the middle will be considered neither too small nor too large
///
/// This value must be between (0, 100)
/// Default is 80
#[clap(
long = "compaction-split-percentage",
env = "INFLUXDB_IOX_COMPACTION_SPLIT_PERCENTAGE",
default_value = "80",
action
)]
pub split_percentage: u16,
/// Max number of partitions per shard we want to compact per cycle
/// Default: 1
#[clap(
long = "compaction-max-number-partitions-per-shard",
env = "INFLUXDB_IOX_COMPACTION_MAX_NUMBER_PARTITIONS_PER_SHARD",
default_value = "1",
action
)]
pub max_number_partitions_per_shard: usize,
/// Min number of recent ingested files a partition needs to be considered for
/// compacting
///
/// Default: 1
#[clap(
long = "compaction-min-number-recent-ingested-files-per-partition",
env = "INFLUXDB_IOX_COMPACTION_MIN_NUMBER_RECENT_INGESTED_FILES_PER_PARTITION",
default_value = "1",
action
)]
pub min_number_recent_ingested_files_per_partition: usize,
/// The multiple of times that compacting hot partitions should run for every one time
/// that compacting cold partitions runs. Set to 1 to compact hot partitions and cold
/// partitions equally.
///
/// Default is
#[doc = $hot_multiple_default]
#[clap(
long = "compaction-hot-multiple",
env = "INFLUXDB_IOX_COMPACTION_HOT_MULTIPLE",
default_value = $hot_multiple_default,
action
)]
pub hot_multiple: usize,
/// The memory budget assigned to this compactor.
///
/// For each partition candidate, we will estimate the memory needed to compact each
/// file and only add more files if their needed estimated memory is below this memory
/// budget. Since we must compact L1 files that overlapped with L0 files, if their
/// total estimated memory does not allow us to compact a part of a partition at all,
/// we will not compact it and will log the partition and its related information in a
/// table in our catalog for further diagnosis of the issue.
///
/// The number of candidates compacted concurrently is also decided using this
/// estimation and budget.
///
/// Default is 30 * 1024 * 1024 * 1024 = 32,212,254,720 bytes (30GB).
#[clap(
long = "compaction-memory-budget-bytes",
env = "INFLUXDB_IOX_COMPACTION_MEMORY_BUDGET_BYTES",
default_value = "32212254720",
action
)]
pub memory_budget_bytes: u64,
/// Minimum number of rows allocated for each record batch fed into DataFusion plan
///
/// We will use max(parquet_file's row_count, min_num_rows_allocated_per_record_batch_to_datafusion_plan)
/// to estimate number of rows allocated for each record batch fed into DataFusion plan.
///
#[clap(
long = "compaction-min-rows-allocated-per-record-batch-to-plan",
env = "INFLUXDB_IOX_COMPACTION_MIN_ROWS_PER_RECORD_BATCH_TO_PLAN",
default_value = "8192",
action
)]
pub min_num_rows_allocated_per_record_batch_to_datafusion_plan: u64,
/// Max number of files to compact per partition
///
/// Due to limitations of the implementation of the compaction plans
/// there is a hard maximum on the number of files that can be compacted
/// at once. This avoids a wide fan-in multi-way merge in the DataFusion plan
#[clap(
long = "compaction-max-num-compacting-files",
env = "INFLUXDB_IOX_COMPACTION_MAX_COMPACTING_FILES",
default_value = "20",
action
)]
pub max_num_compacting_files: usize,
/// Max number of files to compact for a partition in which the first file and its
/// overlaps push the file count limit over `max_num_compacting_files`.
/// It's a special case of `max_num_compacting_files` that's higher just for the first
/// file in a partition
#[clap(
long = "compaction-max-num-compacting-files-first-in-partition",
env = "INFLUXDB_IOX_COMPACTION_MAX_COMPACTING_FILES_FIRST_IN_PARTITION",
default_value = "40",
action
)]
pub max_num_compacting_files_first_in_partition: usize,
/// Number of minutes without a write to a partition before it is considered cold
/// and thus a candidate for compaction
#[clap(
long = "compaction-minutes-without-new-writes-to-be-cold",
env = "INFLUXDB_IOX_COMPACTION_MINUTES_WITHOUT_NEW_WRITE_TO_BE_COLD",
default_value = "480",
action
)]
pub minutes_without_new_writes_to_be_cold: u64,
/// When querying for partitions with data for hot compaction, how many hours to look
/// back for a first pass.
#[clap(
long = "compaction-hot-partition-hours-threshold-1",
env = "INFLUXDB_IOX_COMPACTION_HOT_PARTITION_HOURS_THRESHOLD_1",
default_value = "4",
action
)]
pub hot_compaction_hours_threshold_1: u64,
/// When querying for partitions with data for hot compaction, how many hours to look
/// back for a second pass if we found nothing in the first pass.
#[clap(
long = "compaction-hot-partition-hours-threshold-2",
env = "INFLUXDB_IOX_COMPACTION_HOT_PARTITION_HOURS_THRESHOLD_2",
default_value = "24",
action
)]
pub hot_compaction_hours_threshold_2: u64,
/// Max number of partitions that can be compacted in parallel at once
/// We use memory budget to estimate how many partitions can be compacted in parallel at once.
/// However, we do not want to have that number too large which will cause the high usage of CPU cores
/// and may also lead to inaccuracy of memory estimation. This number is to cap that.
#[clap(
long = "compaction-max-parallel-partitions",
env = "INFLUXDB_IOX_COMPACTION_MAX_PARALLEL_PARTITIONS",
default_value = "20",
action
)]
pub max_parallel_partitions: u64,
}
};
}
gen_compactor_config!(Compactor2Config, hot_multiple_default = "4");
gen_compactor_config!(Compactor2OnceConfig, hot_multiple_default = "1");
impl Compactor2OnceConfig {
/// Convert the configuration for `compactor run-once` into the configuration for `run
/// compactor` so that run-once can reuse some of the code that the compactor server uses.
pub fn into_compactor_config(self) -> Compactor2Config {
Compactor2Config {
max_desired_file_size_bytes: self.max_desired_file_size_bytes,
percentage_max_file_size: self.percentage_max_file_size,
split_percentage: self.split_percentage,
max_number_partitions_per_shard: self.max_number_partitions_per_shard,
min_number_recent_ingested_files_per_partition: self
.min_number_recent_ingested_files_per_partition,
hot_multiple: self.hot_multiple,
memory_budget_bytes: self.memory_budget_bytes,
min_num_rows_allocated_per_record_batch_to_datafusion_plan: self
.min_num_rows_allocated_per_record_batch_to_datafusion_plan,
max_num_compacting_files: self.max_num_compacting_files,
max_num_compacting_files_first_in_partition: self.max_num_compacting_files_first_in_partition,
minutes_without_new_writes_to_be_cold: self.minutes_without_new_writes_to_be_cold,
hot_compaction_hours_threshold_1: self.hot_compaction_hours_threshold_1,
hot_compaction_hours_threshold_2: self.hot_compaction_hours_threshold_2,
max_parallel_partitions: self.max_parallel_partitions,
}
}
}

View File

@ -14,6 +14,7 @@
)]
pub mod catalog_dsn;
pub mod compactor;
pub mod compactor2;
pub mod ingester;
pub mod ingester2;
pub mod object_store;

View File

@ -88,6 +88,13 @@ pub struct Config {
}
pub async fn command(config: Config) -> Result<(), Error> {
if std::env::var("INFLUXDB_IOX_RPC_MODE").is_ok() {
panic!(
"`INFLUXDB_IOX_RPC_MODE` was specified but `compactor` was the command run. Either unset
`INFLUXDB_IOX_RPC_MODE` or run the `compactor2` command."
);
}
let common_state = CommonServerState::from_config(config.run_config.clone())?;
let time_provider = Arc::new(SystemProvider::new()) as Arc<dyn TimeProvider>;

View File

@ -0,0 +1,145 @@
//! Command line options for running compactor2 in RPC write mode
use iox_query::exec::{Executor, ExecutorConfig};
use iox_time::{SystemProvider, TimeProvider};
use object_store::DynObjectStore;
use object_store_metrics::ObjectStoreMetrics;
use observability_deps::tracing::*;
use parquet_file::storage::{ParquetStorage, StorageId};
use std::collections::HashMap;
use std::sync::Arc;
use thiserror::Error;
use clap_blocks::object_store::make_object_store;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig, compactor2::Compactor2Config, run_config::RunConfig,
};
use ioxd_common::server_type::{CommonServerState, CommonServerStateError};
use ioxd_common::Service;
use ioxd_compactor::create_compactor2_server_type;
use crate::process_info::{setup_metric_registry, USIZE_MAX};
use super::main;
#[derive(Debug, Error)]
pub enum Error {
#[error("Run: {0}")]
Run(#[from] main::Error),
#[error("Invalid config: {0}")]
InvalidConfig(#[from] CommonServerStateError),
#[error("Catalog error: {0}")]
Catalog(#[from] iox_catalog::interface::Error),
#[error("Catalog DSN error: {0}")]
CatalogDsn(#[from] clap_blocks::catalog_dsn::Error),
#[error("Cannot parse object store config: {0}")]
ObjectStoreParsing(#[from] clap_blocks::object_store::ParseError),
#[error("error initializing compactor: {0}")]
Compactor(#[from] ioxd_compactor::Error),
}
#[derive(Debug, clap::Parser)]
#[clap(
name = "run",
about = "Runs in compactor mode using the RPC write path",
long_about = "Run the IOx compactor server.\n\nThe configuration options below can be \
set either with the command line flags or with the specified environment \
variable. If there is a file named '.env' in the current working directory, \
it is sourced before loading the configuration.
Configuration is loaded from the following sources (highest precedence first):
- command line arguments
- user set environment variables
- .env file contents
- pre-configured default values"
)]
pub struct Config {
#[clap(flatten)]
pub(crate) run_config: RunConfig,
#[clap(flatten)]
pub(crate) catalog_dsn: CatalogDsnConfig,
#[clap(flatten)]
pub(crate) compactor_config: Compactor2Config,
/// Number of threads to use for the compactor query execution, compaction and persistence.
#[clap(
long = "query-exec-thread-count",
env = "INFLUXDB_IOX_QUERY_EXEC_THREAD_COUNT",
default_value = "4",
action
)]
pub query_exec_thread_count: usize,
/// Size of memory pool used during query exec, in bytes.
#[clap(
long = "exec-mem-pool-bytes",
env = "INFLUXDB_IOX_EXEC_MEM_POOL_BYTES",
default_value = &USIZE_MAX[..],
action
)]
pub exec_mem_pool_bytes: usize,
}
pub async fn command(config: Config) -> Result<(), Error> {
if std::env::var("INFLUXDB_IOX_RPC_MODE").is_err() {
panic!(
"`INFLUXDB_IOX_RPC_MODE` was not specified but `compactor2` was the command run. Either set
`INFLUXDB_IOX_RPC_MODE` or run the `compactor` command."
);
}
let common_state = CommonServerState::from_config(config.run_config.clone())?;
let time_provider = Arc::new(SystemProvider::new()) as Arc<dyn TimeProvider>;
let metric_registry = setup_metric_registry();
let catalog = config
.catalog_dsn
.get_catalog("compactor", Arc::clone(&metric_registry))
.await?;
let object_store = make_object_store(config.run_config.object_store_config())
.map_err(Error::ObjectStoreParsing)?;
// Decorate the object store with a metric recorder.
let object_store: Arc<DynObjectStore> = Arc::new(ObjectStoreMetrics::new(
object_store,
Arc::clone(&time_provider),
&metric_registry,
));
let parquet_store = ParquetStorage::new(object_store, StorageId::from("iox"));
let exec = Arc::new(Executor::new_with_config(ExecutorConfig {
num_threads: config.query_exec_thread_count,
target_query_partitions: config.query_exec_thread_count,
object_stores: HashMap::from([(
parquet_store.id(),
Arc::clone(parquet_store.object_store()),
)]),
mem_pool_size: config.exec_mem_pool_bytes,
}));
let time_provider = Arc::new(SystemProvider::new());
let server_type = create_compactor2_server_type(
&common_state,
Arc::clone(&metric_registry),
catalog,
parquet_store,
exec,
time_provider,
config.compactor_config,
)
.await?;
info!("starting compactor");
let services = vec![Service::create(server_type, common_state.run_config())];
Ok(main::main(common_state, services, metric_registry).await?)
}

View File

@ -3,6 +3,7 @@ use trogging::cli::LoggingConfig;
pub(crate) mod all_in_one;
mod compactor;
mod compactor2;
mod garbage_collector;
mod ingester;
mod ingester2;
@ -18,6 +19,9 @@ pub enum Error {
#[snafu(display("Error in compactor subcommand: {}", source))]
CompactorError { source: compactor::Error },
#[snafu(display("Error in compactor2 subcommand: {}", source))]
Compactor2Error { source: compactor2::Error },
#[snafu(display("Error in garbage collector subcommand: {}", source))]
GarbageCollectorError { source: garbage_collector::Error },
@ -60,6 +64,7 @@ impl Config {
match &self.command {
None => &self.all_in_one_config.logging_config,
Some(Command::Compactor(config)) => config.run_config.logging_config(),
Some(Command::Compactor2(config)) => config.run_config.logging_config(),
Some(Command::GarbageCollector(config)) => config.run_config.logging_config(),
Some(Command::Querier(config)) => config.run_config.logging_config(),
Some(Command::Router(config)) => config.run_config.logging_config(),
@ -77,6 +82,9 @@ enum Command {
/// Run the server in compactor mode
Compactor(compactor::Config),
/// Run the server in compactor2 mode
Compactor2(compactor2::Config),
/// Run the server in querier mode
Querier(querier::Config),
@ -110,6 +118,9 @@ pub async fn command(config: Config) -> Result<()> {
Some(Command::Compactor(config)) => {
compactor::command(config).await.context(CompactorSnafu)
}
Some(Command::Compactor2(config)) => {
compactor2::command(config).await.context(Compactor2Snafu)
}
Some(Command::GarbageCollector(config)) => garbage_collector::command(config)
.await
.context(GarbageCollectorSnafu),

View File

@ -172,3 +172,38 @@ fn querier_without_ingesters_with_mode_env_var_uses_rpc_write() {
.failure()
.stdout(predicate::str::contains("using the RPC write path"));
}
#[test]
fn compactor_errors_with_mode_env_var() {
Command::cargo_bin("influxdb_iox")
.unwrap()
.env_clear()
.env("INFLUXDB_IOX_RPC_MODE", "2")
.arg("run")
.arg("compactor")
.arg("--shard-index-range-start")
.arg("0")
.arg("--shard-index-range-end")
.arg("1")
.timeout(Duration::from_secs(2))
.assert()
.failure()
.stderr(predicate::str::contains(
"`INFLUXDB_IOX_RPC_MODE` was specified but `compactor` was the command run",
));
}
#[test]
fn compactor2_errors_without_mode_env_var() {
Command::cargo_bin("influxdb_iox")
.unwrap()
.env_clear()
.arg("run")
.arg("compactor2")
.timeout(Duration::from_secs(2))
.assert()
.failure()
.stderr(predicate::str::contains(
"`INFLUXDB_IOX_RPC_MODE` was not specified but `compactor2` was the command run",
));
}

View File

@ -132,7 +132,123 @@ impl HttpApiErrorSource for IoxHttpError {
}
}
/// Instantiate a compactor server that uses the RPC write path
// NOTE!!! This needs to be kept in sync with `create_compactor_server_type` until the switch to
// the RPC write path mode is complete! See the annotations about where these two functions line up
// and where they diverge.
pub async fn create_compactor2_server_type(
common_state: &CommonServerState,
metric_registry: Arc<metric::Registry>,
catalog: Arc<dyn Catalog>,
parquet_store: ParquetStorage,
exec: Arc<Executor>,
time_provider: Arc<dyn TimeProvider>,
// Parameter difference: this function takes a `Compactor2Config` that doesn't have a write
// buffer topic or shard indexes. All the other parameters here are the same.
compactor_config: Compactor2Config,
) -> Result<Arc<dyn ServerType>> {
let grpc_catalog = Arc::clone(&catalog);
// Setup difference: build a compactor2 instead, which expects a `Compactor2Config`.
let compactor = build_compactor2_from_config(
compactor_config,
catalog,
parquet_store,
exec,
time_provider,
Arc::clone(&metric_registry),
)
.await?;
let compactor_handler = Arc::new(CompactorHandlerImpl::new(Arc::new(compactor)));
let grpc = GrpcDelegate::new(grpc_catalog, Arc::clone(&compactor_handler));
let compactor = CompactorServer::new(metric_registry, grpc, compactor_handler);
Ok(Arc::new(CompactorServerType::new(compactor, common_state)))
}
// NOTE!!! This needs to be kept in sync with `build_compactor_from_config` until the switch to
// the RPC write path mode is complete! See the annotations about where these two functions line up
// and where they diverge.
pub async fn build_compactor2_from_config(
// Parameter difference: this function takes a `Compactor2Config` that doesn't have a write
// buffer topic or shard indexes. All the other parameters here are the same.
compactor_config: Compactor2Config,
catalog: Arc<dyn Catalog>,
parquet_store: ParquetStorage,
exec: Arc<Executor>,
time_provider: Arc<dyn TimeProvider>,
metric_registry: Arc<Registry>,
) -> Result<compactor::compact::Compactor, Error> {
// 1. Shard index range checking: MISSING
// This function doesn't check the shard index range here like `build_compactor_from_config`
// does because shard indexes aren't relevant to `compactor2`.
// 2. Split percentage value range checking
if compactor_config.split_percentage < 1 || compactor_config.split_percentage > 100 {
return Err(Error::SplitPercentageRange {
split_percentage: compactor_config.split_percentage,
});
}
// 3. Ensure topic and shard indexes are in the catalog: MISSING
// This isn't relevant to `compactor2`.
// 4. Convert config type to handler config type
let Compactor2Config {
max_desired_file_size_bytes,
percentage_max_file_size,
split_percentage,
max_number_partitions_per_shard,
min_number_recent_ingested_files_per_partition,
hot_multiple,
memory_budget_bytes,
min_num_rows_allocated_per_record_batch_to_datafusion_plan,
max_num_compacting_files,
max_num_compacting_files_first_in_partition,
minutes_without_new_writes_to_be_cold,
hot_compaction_hours_threshold_1,
hot_compaction_hours_threshold_2,
max_parallel_partitions,
} = compactor_config;
let compactor_config = compactor::handler::CompactorConfig {
max_desired_file_size_bytes,
percentage_max_file_size,
split_percentage,
max_number_partitions_per_shard,
min_number_recent_ingested_files_per_partition,
hot_multiple,
memory_budget_bytes,
min_num_rows_allocated_per_record_batch_to_datafusion_plan,
max_num_compacting_files,
max_num_compacting_files_first_in_partition,
minutes_without_new_writes_to_be_cold,
hot_compaction_hours_threshold_1,
hot_compaction_hours_threshold_2,
max_parallel_partitions,
};
// 4. END
// 5. Create a new compactor: `compactor2` is assigned all shards (this argument can go away
// completely when the switch to RPC mode is complete)
Ok(compactor::compact::Compactor::new(
compactor::compact::ShardAssignment::All,
catalog,
parquet_store,
exec,
time_provider,
backoff::BackoffConfig::default(),
compactor_config,
metric_registry,
))
}
/// Instantiate a compactor server
// NOTE!!! This needs to be kept in sync with `create_compactor2_server_type` until the switch to
// the RPC write path mode is complete! See the annotations about where these two functions line up
// and where they diverge.
pub async fn create_compactor_server_type(
common_state: &CommonServerState,
metric_registry: Arc<metric::Registry>,
@ -140,9 +256,13 @@ pub async fn create_compactor_server_type(
parquet_store: ParquetStorage,
exec: Arc<Executor>,
time_provider: Arc<dyn TimeProvider>,
// Parameter difference: this function takes a `CompactorConfig` that has a write buffer topic
// and requires shard indexes. All the other parameters here are the same.
compactor_config: CompactorConfig,
) -> Result<Arc<dyn ServerType>> {
let grpc_catalog = Arc::clone(&catalog);
// Setup difference: build a compactor instead, which expects a `CompactorConfig`.
let compactor = build_compactor_from_config(
compactor_config,
catalog,
@ -161,7 +281,12 @@ pub async fn create_compactor_server_type(
Ok(Arc::new(CompactorServerType::new(compactor, common_state)))
}
// NOTE!!! This needs to be kept in sync with `build_compactor2_from_config` until the switch to
// the RPC write path mode is complete! See the annotations about where these two functions line up
// and where they diverge.
pub async fn build_compactor_from_config(
// Parameter difference: this function takes a `CompactorConfig` that has a write buffer topic
// and requires shard indexes. All the other parameters here are the same.
compactor_config: CompactorConfig,
catalog: Arc<dyn Catalog>,
parquet_store: ParquetStorage,
@ -169,16 +294,22 @@ pub async fn build_compactor_from_config(
time_provider: Arc<dyn TimeProvider>,
metric_registry: Arc<Registry>,
) -> Result<compactor::compact::Compactor, Error> {
// 1. Shard index range checking
// This function checks the shard index range; `compactor2` doesn't have shard indexes so
// `build_compactor2_from_config` doesn't have this check.
if compactor_config.shard_index_range_start > compactor_config.shard_index_range_end {
return Err(Error::ShardIndexRange);
}
// 2. Split percentage value range checking
if compactor_config.split_percentage < 1 || compactor_config.split_percentage > 100 {
return Err(Error::SplitPercentageRange {
split_percentage: compactor_config.split_percentage,
});
}
// 3. Ensure topic and shard indexes are in the catalog
// This isn't relevant to `compactor2`.
let mut txn = catalog.start_transaction().await?;
let topic = txn
.topics()
@ -199,7 +330,9 @@ pub async fn build_compactor_from_config(
shards.push(s.id);
}
txn.commit().await?;
// 3. END
// 4. Convert config type to handler config type
let CompactorConfig {
max_desired_file_size_bytes,
percentage_max_file_size,
@ -240,7 +373,9 @@ pub async fn build_compactor_from_config(
warm_compaction_small_size_threshold_bytes,
warm_compaction_min_small_file_count,
};
// 4. END
// 5. Create a new compactor: Assigned only those shards specified in the CLI args.
Ok(compactor::compact::Compactor::new(
shards,
catalog,