diff --git a/Cargo.lock b/Cargo.lock index 5eb65621d9..527a0e6eda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2943,11 +2943,14 @@ name = "influxdb3_clap_blocks" version = "0.1.0" dependencies = [ "clap", + "datafusion", "futures", "humantime", + "iox_query", "libc", "observability_deps", "paste", + "test-log", "tokio", ] diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 87f235aab5..3f5f16fd25 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -12,7 +12,7 @@ use influxdb3_cache::{ meta_cache::MetaCacheProvider, parquet_cache::create_cached_obj_store_and_oracle, }; -use influxdb3_clap_blocks::tokio::TokioDatafusionConfig; +use influxdb3_clap_blocks::{datafusion::IoxQueryDatafusionConfig, tokio::TokioDatafusionConfig}; use influxdb3_process::{ build_malloc_conf, setup_metric_registry, INFLUXDB3_GIT_HASH, INFLUXDB3_VERSION, PROCESS_UUID, }; @@ -36,8 +36,8 @@ use object_store::ObjectStore; use observability_deps::tracing::*; use panic_logging::SendPanicsToTracing; use parquet_file::storage::{ParquetStorage, StorageId}; -use std::{collections::HashMap, path::Path, str::FromStr}; use std::{num::NonZeroUsize, sync::Arc}; +use std::{path::Path, str::FromStr}; use thiserror::Error; use tokio::net::TcpListener; use tokio::time::Instant; @@ -112,6 +112,10 @@ pub struct Config { #[clap(flatten)] pub(crate) tokio_datafusion_config: TokioDatafusionConfig, + /// iox_query extended DataFusion config + #[clap(flatten)] + pub(crate) iox_query_datafusion_config: IoxQueryDatafusionConfig, + /// Maximum size of HTTP requests. #[clap( long = "max-http-request-size", @@ -152,16 +156,6 @@ pub struct Config { )] pub exec_mem_pool_bytes: MemorySize, - /// DataFusion config. - #[clap( - long = "datafusion-config", - env = "INFLUXDB_IOX_DATAFUSION_CONFIG", - default_value = "", - value_parser = parse_datafusion_config, - action - )] - pub datafusion_config: HashMap, - /// bearer token to be set for requests #[clap(long = "bearer-token", env = "INFLUXDB3_BEARER_TOKEN", action)] pub bearer_token: Option, @@ -514,7 +508,7 @@ pub async fn command(config: Config) -> Result<()> { write_buffer: Arc::clone(&write_buffer), exec: Arc::clone(&exec), metrics: Arc::clone(&metrics), - datafusion_config: Arc::new(config.datafusion_config), + datafusion_config: Arc::new(config.iox_query_datafusion_config.build()), query_log_size: config.query_log_size, telemetry_store: Arc::clone(&telemetry_store), sys_events_store: Arc::clone(&sys_events_store), @@ -572,34 +566,3 @@ async fn setup_telemetry_store( ) .await } - -fn parse_datafusion_config( - s: &str, -) -> Result, Box> { - let s = s.trim(); - if s.is_empty() { - return Ok(HashMap::with_capacity(0)); - } - - let mut out = HashMap::new(); - for part in s.split(',') { - let kv = part.trim().splitn(2, ':').collect::>(); - match kv.as_slice() { - [key, value] => { - let key_owned = key.trim().to_owned(); - let value_owned = value.trim().to_owned(); - let existed = out.insert(key_owned, value_owned).is_some(); - if existed { - return Err(format!("key '{key}' passed multiple times").into()); - } - } - _ => { - return Err( - format!("Invalid key value pair - expected 'KEY:VALUE' got '{s}'").into(), - ); - } - } - } - - Ok(out) -} diff --git a/influxdb3_clap_blocks/Cargo.toml b/influxdb3_clap_blocks/Cargo.toml index 5bdc24393f..25da081dd7 100644 --- a/influxdb3_clap_blocks/Cargo.toml +++ b/influxdb3_clap_blocks/Cargo.toml @@ -7,10 +7,12 @@ license.workspace = true [dependencies] # core crate dependencies +iox_query.workspace = true observability_deps.workspace = true # crates.io dependencies clap.workspace = true +datafusion.workspace = true humantime.workspace = true libc.workspace = true paste.workspace = true @@ -18,6 +20,7 @@ tokio.workspace = true [dev-dependencies] futures.workspace = true +test-log.workspace = true [lints] workspace = true diff --git a/influxdb3_clap_blocks/src/datafusion.rs b/influxdb3_clap_blocks/src/datafusion.rs new file mode 100644 index 0000000000..1ccb2e5c87 --- /dev/null +++ b/influxdb3_clap_blocks/src/datafusion.rs @@ -0,0 +1,125 @@ +use std::collections::HashMap; + +use datafusion::config::ConfigExtension; +use iox_query::config::IoxConfigExt; + +/// Extends the standard [`HashMap`] based DataFusion config option in the CLI with specific +/// options (along with defaults) for InfluxDB 3 OSS/Pro. This is intended for customization of +/// options that are defined in the `iox_query` crate, e.g., those defined in [`IoxConfigExt`] +/// that are relevant to the monolithinc versions of InfluxDB 3. +#[derive(Debug, clap::Parser, Clone)] +pub struct IoxQueryDatafusionConfig { + /// When multiple parquet files are required in a sorted way (e.g. for de-duplication), we have + /// two options: + /// + /// 1. **In-mem sorting:** Put them into `datafusion.target_partitions` DataFusion partitions. + /// This limits the fan-out, but requires that we potentially chain multiple parquet files into + /// a single DataFusion partition. Since chaining sorted data does NOT automatically result in + /// sorted data (e.g. AB-AB is not sorted), we need to preform an in-memory sort using + /// `SortExec` afterwards. This is expensive. + /// 2. **Fan-out:** Instead of chaining files within DataFusion partitions, we can accept a + /// fan-out beyond `target_partitions`. This prevents in-memory sorting but may result in OOMs + /// (out-of-memory) if the fan-out is too large. + /// + /// We try to pick option 2 up to a certain number of files, which is configured by this + /// setting. + #[clap( + long = "datafusion-max-parquet-fanout", + env = "INFLUXDB3_DATAFUSION_MAX_PARQUET_FANOUT", + default_value = "1000", + action + )] + pub max_parquet_fanout: usize, + + /// Provide custom configuration to DataFusion as a comma-separated list of key:value pairs. + /// + /// # Example + /// ```text + /// --datafusion-config "datafusion.key1:value1, datafusion.key2:value2" + /// ``` + #[clap( + long = "datafusion-config", + env = "INFLUXDB3_DATAFUSION_CONFIG", + default_value = "", + value_parser = parse_datafusion_config, + action + )] + pub datafusion_config: HashMap, +} + +impl IoxQueryDatafusionConfig { + /// Build a [`HashMap`] to be used as the DataFusion config for the query executor + /// + /// This takes the provided `--datafusion-config` and extends it with options available on this + /// [`IoxQueryDatafusionConfig`] struct. Note, any IOx extension parameters that are defined + /// in the `datafusion_config` will be overridden by the provided values or their default. For + /// example, if the user provides: + /// ``` + /// --datafusion-config "iox.max_arquet_fanout:50" + /// ``` + /// This will be overridden with with the default value for `max_parquet_fanout` of `1000`, or + /// with the value provided for the `--datafusion-max-parquet-fanout` argument. + pub fn build(mut self) -> HashMap { + self.datafusion_config.insert( + format!("{prefix}.max_parquet_fanout", prefix = IoxConfigExt::PREFIX), + self.max_parquet_fanout.to_string(), + ); + self.datafusion_config + } +} + +fn parse_datafusion_config( + s: &str, +) -> Result, Box> { + let s = s.trim(); + if s.is_empty() { + return Ok(HashMap::with_capacity(0)); + } + + let mut out = HashMap::new(); + for part in s.split(',') { + let kv = part.trim().splitn(2, ':').collect::>(); + match kv.as_slice() { + [key, value] => { + let key_owned = key.trim().to_owned(); + let value_owned = value.trim().to_owned(); + let existed = out.insert(key_owned, value_owned).is_some(); + if existed { + return Err(format!("key '{key}' passed multiple times").into()); + } + } + _ => { + return Err( + format!("Invalid key value pair - expected 'KEY:VALUE' got '{s}'").into(), + ); + } + } + } + + Ok(out) +} + +#[cfg(test)] +mod tests { + use clap::Parser; + use iox_query::{config::IoxConfigExt, exec::Executor}; + + use super::IoxQueryDatafusionConfig; + + #[test_log::test] + fn max_parquet_fanout() { + let datafusion_config = + IoxQueryDatafusionConfig::parse_from(["", "--datafusion-max-parquet-fanout", "5"]) + .build(); + let exec = Executor::new_testing(); + let mut session_config = exec.new_session_config(); + for (k, v) in &datafusion_config { + session_config = session_config.with_config_option(k, v); + } + let ctx = session_config.build(); + let inner_ctx = ctx.inner().state(); + let config = inner_ctx.config(); + let iox_config_ext = config.options().extensions.get::().unwrap(); + assert_eq!(5, iox_config_ext.max_parquet_fanout); + } +} diff --git a/influxdb3_clap_blocks/src/lib.rs b/influxdb3_clap_blocks/src/lib.rs index fcd0114403..f8d7844a29 100644 --- a/influxdb3_clap_blocks/src/lib.rs +++ b/influxdb3_clap_blocks/src/lib.rs @@ -1,3 +1,4 @@ //! Configuration options for the `influxdb3` CLI which uses the `clap` crate +pub mod datafusion; pub mod tokio;