feat: cli arg to specify max parquet fanout (#25714)
This allows the `max_parquet_fanout` to be specified in the CLI for the `influxdb3 serve` command. This could be done previously via the `--datafusion-config` CLI argument, but the drawbacks to that were: 1. that is a fairly advanced option given the available key/value pairs are not well documented 2. if `iox.max_parquet_fanout` was not provided to that argument, the default would be set to `40` This PR maintains the existing `--datafusion-config` CLI argument (with one caveat, see below) which allows users to provide a set key/value pairs that will be used to build the internal DataFusion config, but in addition provides the `--datafusion-max-parquet-fanout` argument: ``` --datafusion-max-parquet-fanout <MAX_PARQUET_FANOUT> When multiple parquet files are required in a sorted way (e.g. for de-duplication), we have two options: 1. **In-mem sorting:** Put them into `datafusion.target_partitions` DataFusion partitions. This limits the fan-out, but requires that we potentially chain multiple parquet files into a single DataFusion partition. Since chaining sorted data does NOT automatically result in sorted data (e.g. AB-AB is not sorted), we need to preform an in-memory sort using `SortExec` afterwards. This is expensive. 2. **Fan-out:** Instead of chaining files within DataFusion partitions, we can accept a fan-out beyond `target_partitions`. This prevents in-memory sorting but may result in OOMs (out-of-memory) if the fan-out is too large. We try to pick option 2 up to a certain number of files, which is configured by this setting. [env: INFLUXDB3_DATAFUSION_MAX_PARQUET_FANOUT=] [default: 1000] ``` with the default value of `1000`, which will override the core `iox_query` default of `40`. A test was added to check that this is propagated down to the `IOxSessionContext` that is used during queries. The only change to the `datafusion-config` CLI argument was to rename `INFLUXDB_IOX` in the environment variable to `INFLUXDB3`: ``` --datafusion-config <DATAFUSION_CONFIG> Provide custom configuration to DataFusion as a comma-separated list of key:value pairs. # Example ```text --datafusion-config "datafusion.key1:value1, datafusion.key2:value2" ``` [env: INFLUXDB3_DATAFUSION_CONFIG=] [default: ] ```pull/25723/head
parent
96a580f365
commit
03ea565802
|
@ -2943,11 +2943,14 @@ name = "influxdb3_clap_blocks"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"datafusion",
|
||||
"futures",
|
||||
"humantime",
|
||||
"iox_query",
|
||||
"libc",
|
||||
"observability_deps",
|
||||
"paste",
|
||||
"test-log",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ use influxdb3_cache::{
|
|||
meta_cache::MetaCacheProvider,
|
||||
parquet_cache::create_cached_obj_store_and_oracle,
|
||||
};
|
||||
use influxdb3_clap_blocks::tokio::TokioDatafusionConfig;
|
||||
use influxdb3_clap_blocks::{datafusion::IoxQueryDatafusionConfig, tokio::TokioDatafusionConfig};
|
||||
use influxdb3_process::{
|
||||
build_malloc_conf, setup_metric_registry, INFLUXDB3_GIT_HASH, INFLUXDB3_VERSION, PROCESS_UUID,
|
||||
};
|
||||
|
@ -36,8 +36,8 @@ use object_store::ObjectStore;
|
|||
use observability_deps::tracing::*;
|
||||
use panic_logging::SendPanicsToTracing;
|
||||
use parquet_file::storage::{ParquetStorage, StorageId};
|
||||
use std::{collections::HashMap, path::Path, str::FromStr};
|
||||
use std::{num::NonZeroUsize, sync::Arc};
|
||||
use std::{path::Path, str::FromStr};
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::time::Instant;
|
||||
|
@ -112,6 +112,10 @@ pub struct Config {
|
|||
#[clap(flatten)]
|
||||
pub(crate) tokio_datafusion_config: TokioDatafusionConfig,
|
||||
|
||||
/// iox_query extended DataFusion config
|
||||
#[clap(flatten)]
|
||||
pub(crate) iox_query_datafusion_config: IoxQueryDatafusionConfig,
|
||||
|
||||
/// Maximum size of HTTP requests.
|
||||
#[clap(
|
||||
long = "max-http-request-size",
|
||||
|
@ -152,16 +156,6 @@ pub struct Config {
|
|||
)]
|
||||
pub exec_mem_pool_bytes: MemorySize,
|
||||
|
||||
/// DataFusion config.
|
||||
#[clap(
|
||||
long = "datafusion-config",
|
||||
env = "INFLUXDB_IOX_DATAFUSION_CONFIG",
|
||||
default_value = "",
|
||||
value_parser = parse_datafusion_config,
|
||||
action
|
||||
)]
|
||||
pub datafusion_config: HashMap<String, String>,
|
||||
|
||||
/// bearer token to be set for requests
|
||||
#[clap(long = "bearer-token", env = "INFLUXDB3_BEARER_TOKEN", action)]
|
||||
pub bearer_token: Option<String>,
|
||||
|
@ -514,7 +508,7 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
write_buffer: Arc::clone(&write_buffer),
|
||||
exec: Arc::clone(&exec),
|
||||
metrics: Arc::clone(&metrics),
|
||||
datafusion_config: Arc::new(config.datafusion_config),
|
||||
datafusion_config: Arc::new(config.iox_query_datafusion_config.build()),
|
||||
query_log_size: config.query_log_size,
|
||||
telemetry_store: Arc::clone(&telemetry_store),
|
||||
sys_events_store: Arc::clone(&sys_events_store),
|
||||
|
@ -572,34 +566,3 @@ async fn setup_telemetry_store(
|
|||
)
|
||||
.await
|
||||
}
|
||||
|
||||
fn parse_datafusion_config(
|
||||
s: &str,
|
||||
) -> Result<HashMap<String, String>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
||||
let s = s.trim();
|
||||
if s.is_empty() {
|
||||
return Ok(HashMap::with_capacity(0));
|
||||
}
|
||||
|
||||
let mut out = HashMap::new();
|
||||
for part in s.split(',') {
|
||||
let kv = part.trim().splitn(2, ':').collect::<Vec<_>>();
|
||||
match kv.as_slice() {
|
||||
[key, value] => {
|
||||
let key_owned = key.trim().to_owned();
|
||||
let value_owned = value.trim().to_owned();
|
||||
let existed = out.insert(key_owned, value_owned).is_some();
|
||||
if existed {
|
||||
return Err(format!("key '{key}' passed multiple times").into());
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
return Err(
|
||||
format!("Invalid key value pair - expected 'KEY:VALUE' got '{s}'").into(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
|
|
|
@ -7,10 +7,12 @@ license.workspace = true
|
|||
|
||||
[dependencies]
|
||||
# core crate dependencies
|
||||
iox_query.workspace = true
|
||||
observability_deps.workspace = true
|
||||
|
||||
# crates.io dependencies
|
||||
clap.workspace = true
|
||||
datafusion.workspace = true
|
||||
humantime.workspace = true
|
||||
libc.workspace = true
|
||||
paste.workspace = true
|
||||
|
@ -18,6 +20,7 @@ tokio.workspace = true
|
|||
|
||||
[dev-dependencies]
|
||||
futures.workspace = true
|
||||
test-log.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
|
|
@ -0,0 +1,125 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use datafusion::config::ConfigExtension;
|
||||
use iox_query::config::IoxConfigExt;
|
||||
|
||||
/// Extends the standard [`HashMap`] based DataFusion config option in the CLI with specific
|
||||
/// options (along with defaults) for InfluxDB 3 OSS/Pro. This is intended for customization of
|
||||
/// options that are defined in the `iox_query` crate, e.g., those defined in [`IoxConfigExt`]
|
||||
/// that are relevant to the monolithinc versions of InfluxDB 3.
|
||||
#[derive(Debug, clap::Parser, Clone)]
|
||||
pub struct IoxQueryDatafusionConfig {
|
||||
/// When multiple parquet files are required in a sorted way (e.g. for de-duplication), we have
|
||||
/// two options:
|
||||
///
|
||||
/// 1. **In-mem sorting:** Put them into `datafusion.target_partitions` DataFusion partitions.
|
||||
/// This limits the fan-out, but requires that we potentially chain multiple parquet files into
|
||||
/// a single DataFusion partition. Since chaining sorted data does NOT automatically result in
|
||||
/// sorted data (e.g. AB-AB is not sorted), we need to preform an in-memory sort using
|
||||
/// `SortExec` afterwards. This is expensive.
|
||||
/// 2. **Fan-out:** Instead of chaining files within DataFusion partitions, we can accept a
|
||||
/// fan-out beyond `target_partitions`. This prevents in-memory sorting but may result in OOMs
|
||||
/// (out-of-memory) if the fan-out is too large.
|
||||
///
|
||||
/// We try to pick option 2 up to a certain number of files, which is configured by this
|
||||
/// setting.
|
||||
#[clap(
|
||||
long = "datafusion-max-parquet-fanout",
|
||||
env = "INFLUXDB3_DATAFUSION_MAX_PARQUET_FANOUT",
|
||||
default_value = "1000",
|
||||
action
|
||||
)]
|
||||
pub max_parquet_fanout: usize,
|
||||
|
||||
/// Provide custom configuration to DataFusion as a comma-separated list of key:value pairs.
|
||||
///
|
||||
/// # Example
|
||||
/// ```text
|
||||
/// --datafusion-config "datafusion.key1:value1, datafusion.key2:value2"
|
||||
/// ```
|
||||
#[clap(
|
||||
long = "datafusion-config",
|
||||
env = "INFLUXDB3_DATAFUSION_CONFIG",
|
||||
default_value = "",
|
||||
value_parser = parse_datafusion_config,
|
||||
action
|
||||
)]
|
||||
pub datafusion_config: HashMap<String, String>,
|
||||
}
|
||||
|
||||
impl IoxQueryDatafusionConfig {
|
||||
/// Build a [`HashMap`] to be used as the DataFusion config for the query executor
|
||||
///
|
||||
/// This takes the provided `--datafusion-config` and extends it with options available on this
|
||||
/// [`IoxQueryDatafusionConfig`] struct. Note, any IOx extension parameters that are defined
|
||||
/// in the `datafusion_config` will be overridden by the provided values or their default. For
|
||||
/// example, if the user provides:
|
||||
/// ```
|
||||
/// --datafusion-config "iox.max_arquet_fanout:50"
|
||||
/// ```
|
||||
/// This will be overridden with with the default value for `max_parquet_fanout` of `1000`, or
|
||||
/// with the value provided for the `--datafusion-max-parquet-fanout` argument.
|
||||
pub fn build(mut self) -> HashMap<String, String> {
|
||||
self.datafusion_config.insert(
|
||||
format!("{prefix}.max_parquet_fanout", prefix = IoxConfigExt::PREFIX),
|
||||
self.max_parquet_fanout.to_string(),
|
||||
);
|
||||
self.datafusion_config
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_datafusion_config(
|
||||
s: &str,
|
||||
) -> Result<HashMap<String, String>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
||||
let s = s.trim();
|
||||
if s.is_empty() {
|
||||
return Ok(HashMap::with_capacity(0));
|
||||
}
|
||||
|
||||
let mut out = HashMap::new();
|
||||
for part in s.split(',') {
|
||||
let kv = part.trim().splitn(2, ':').collect::<Vec<_>>();
|
||||
match kv.as_slice() {
|
||||
[key, value] => {
|
||||
let key_owned = key.trim().to_owned();
|
||||
let value_owned = value.trim().to_owned();
|
||||
let existed = out.insert(key_owned, value_owned).is_some();
|
||||
if existed {
|
||||
return Err(format!("key '{key}' passed multiple times").into());
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
return Err(
|
||||
format!("Invalid key value pair - expected 'KEY:VALUE' got '{s}'").into(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use clap::Parser;
|
||||
use iox_query::{config::IoxConfigExt, exec::Executor};
|
||||
|
||||
use super::IoxQueryDatafusionConfig;
|
||||
|
||||
#[test_log::test]
|
||||
fn max_parquet_fanout() {
|
||||
let datafusion_config =
|
||||
IoxQueryDatafusionConfig::parse_from(["", "--datafusion-max-parquet-fanout", "5"])
|
||||
.build();
|
||||
let exec = Executor::new_testing();
|
||||
let mut session_config = exec.new_session_config();
|
||||
for (k, v) in &datafusion_config {
|
||||
session_config = session_config.with_config_option(k, v);
|
||||
}
|
||||
let ctx = session_config.build();
|
||||
let inner_ctx = ctx.inner().state();
|
||||
let config = inner_ctx.config();
|
||||
let iox_config_ext = config.options().extensions.get::<IoxConfigExt>().unwrap();
|
||||
assert_eq!(5, iox_config_ext.max_parquet_fanout);
|
||||
}
|
||||
}
|
|
@ -1,3 +1,4 @@
|
|||
//! Configuration options for the `influxdb3` CLI which uses the `clap` crate
|
||||
|
||||
pub mod datafusion;
|
||||
pub mod tokio;
|
||||
|
|
Loading…
Reference in New Issue