feat: allow overwriting DataFusion's default config (#7586)

This is helpful to test changes in our defaults but also for testing.

Required for https://github.com/influxdata/idpe/issues/17474 .

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-04-18 13:28:45 +02:00 committed by GitHub
parent b6c2b00353
commit d7dc305972
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 188 additions and 8 deletions

View File

@ -1,7 +1,7 @@
//! Querier-related configs.
use crate::ingester_address::IngesterAddress;
use std::num::NonZeroUsize;
use std::{collections::HashMap, num::NonZeroUsize};
/// CLI config for querier configuration
#[derive(Debug, Clone, PartialEq, Eq, clap::Parser)]
@ -96,6 +96,16 @@ pub struct QuerierConfig {
action
)]
pub ingester_circuit_breaker_threshold: u64,
/// DataFusion config.
#[clap(
long = "datafusion-config",
env = "INFLUXDB_IOX_DATAFUSION_CONFIG",
default_value = "",
value_parser = parse_datafusion_config,
action
)]
pub datafusion_config: HashMap<String, String>,
}
impl QuerierConfig {
@ -121,6 +131,37 @@ impl QuerierConfig {
}
}
fn parse_datafusion_config(
s: &str,
) -> Result<HashMap<String, String>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let s = s.trim();
if s.is_empty() {
return Ok(HashMap::with_capacity(0));
}
let mut out = HashMap::new();
for part in s.split(',') {
let kv = part.trim().splitn(2, ':').collect::<Vec<_>>();
match kv.as_slice() {
[key, value] => {
let key_owned = key.trim().to_owned();
let value_owned = value.trim().to_owned();
let existed = out.insert(key_owned, value_owned).is_some();
if existed {
return Err(format!("key '{key}' passed multiple times").into());
}
}
_ => {
return Err(
format!("Invalid key value pair - expected 'KEY:VALUE' got '{s}'").into(),
);
}
}
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
@ -133,6 +174,7 @@ mod tests {
assert_eq!(actual.num_query_threads(), None);
assert!(actual.ingester_addresses.is_empty());
assert!(actual.datafusion_config.is_empty());
}
#[test]
@ -183,4 +225,41 @@ mod tests {
Invalid: invalid uri character"
);
}
#[test]
fn test_datafusion_config() {
let actual = QuerierConfig::try_parse_from([
"my_binary",
"--datafusion-config= foo : bar , x:y:z ",
])
.unwrap();
assert_eq!(
actual.datafusion_config,
HashMap::from([
(String::from("foo"), String::from("bar")),
(String::from("x"), String::from("y:z")),
]),
);
}
#[test]
fn bad_datafusion_config() {
let actual = QuerierConfig::try_parse_from(["my_binary", "--datafusion-config=foo"])
.unwrap_err()
.to_string();
assert_contains!(
actual,
"error: invalid value 'foo' for '--datafusion-config <DATAFUSION_CONFIG>': Invalid key value pair - expected 'KEY:VALUE' got 'foo'"
);
let actual =
QuerierConfig::try_parse_from(["my_binary", "--datafusion-config=foo:bar,baz:1,foo:2"])
.unwrap_err()
.to_string();
assert_contains!(
actual,
"error: invalid value 'foo:bar,baz:1,foo:2' for '--datafusion-config <DATAFUSION_CONFIG>': key 'foo' passed multiple times"
);
}
}

View File

@ -505,6 +505,7 @@ impl Config {
max_concurrent_queries: querier_max_concurrent_queries,
exec_mem_pool_bytes,
ingester_circuit_breaker_threshold: u64::MAX, // never for all-in-one-mode
datafusion_config: Default::default(),
};
SpecializedConfig {

View File

@ -1,3 +1,5 @@
use std::{str::FromStr, time::Duration};
use datafusion::{common::extensions_options, config::ConfigExtension};
/// IOx-specific config extension prefix.
@ -38,9 +40,55 @@ extensions_options! {
/// [`SortExec`]: datafusion::physical_plan::sorts::sort::SortExec
/// [`target_partitions`]: datafusion::common::config::ExecutionOptions::target_partitions
pub max_parquet_fanout: usize, default = 40
/// Cuttoff date for InfluxQL metadata queries.
pub influxql_metadata_cutoff: MetadataCutoff, default = MetadataCutoff::Relative(Duration::from_secs(3600 * 24))
}
}
impl ConfigExtension for IoxConfigExt {
const PREFIX: &'static str = IOX_CONFIG_PREFIX;
}
/// Optional datetime.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MetadataCutoff {
Absolute(chrono::DateTime<chrono::Utc>),
Relative(Duration),
}
#[derive(Debug)]
pub struct ParseError(String);
impl std::fmt::Display for ParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::error::Error for ParseError {}
impl FromStr for MetadataCutoff {
type Err = ParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Some(s) = s.strip_prefix('-') {
let delta = u64::from_str(s).map_err(|e| ParseError(e.to_string()))?;
let delta = Duration::from_nanos(delta);
Ok(Self::Relative(delta))
} else {
let dt = chrono::DateTime::<chrono::Utc>::from_str(s)
.map_err(|e| ParseError(e.to_string()))?;
Ok(Self::Absolute(dt))
}
}
}
impl std::fmt::Display for MetadataCutoff {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Relative(delta) => write!(f, "-{}", delta.as_nanos()),
Self::Absolute(dt) => write!(f, "{}", dt),
}
}
}

View File

@ -9,6 +9,7 @@ use super::{
split::StreamSplitNode,
};
use crate::{
config::IoxConfigExt,
exec::{
fieldlist::{FieldList, IntoFieldList},
non_null_checker::NonNullCheckerExec,
@ -52,7 +53,7 @@ use datafusion::{
use datafusion_util::config::{iox_session_config, DEFAULT_CATALOG};
use executor::DedicatedExecutor;
use futures::{Stream, StreamExt, TryStreamExt};
use observability_deps::tracing::debug;
use observability_deps::tracing::{debug, warn};
use query_functions::{register_scalar_functions, selectors::register_selector_aggregates};
use std::{convert::TryInto, fmt, num::NonZeroUsize, sync::Arc};
use trace::{
@ -188,7 +189,11 @@ impl fmt::Debug for IOxSessionConfig {
impl IOxSessionConfig {
pub(super) fn new(exec: DedicatedExecutor, runtime: Arc<RuntimeEnv>) -> Self {
let session_config = iox_session_config();
let mut session_config = iox_session_config();
session_config
.options_mut()
.extensions
.insert(IoxConfigExt::default());
Self {
exec,
@ -220,6 +225,26 @@ impl IOxSessionConfig {
Self { span_ctx, ..self }
}
/// Set DataFusion [config option].
///
/// May be used to set [IOx-specific] option as well.
///
///
/// [config option]: datafusion::common::config::ConfigOptions
/// [IOx-specific]: crate::config::IoxConfigExt
pub fn with_config_option(mut self, key: &str, value: &str) -> Self {
// ignore invalid config
if let Err(e) = self.session_config.options_mut().set(key, value) {
warn!(
key,
value,
%e,
"invalid DataFusion config",
);
}
self
}
/// Create an ExecutionContext suitable for executing DataFusion plans
pub fn build(self) -> IOxSessionContext {
let maybe_span = self.span_ctx.child_span("Query Execution");

View File

@ -220,6 +220,7 @@ pub async fn create_querier_server_type(
args.exec,
ingester_connections,
args.querier_config.max_concurrent_queries(),
Arc::new(args.querier_config.datafusion_config),
)
.await?,
);

View File

@ -98,6 +98,8 @@ impl proto::namespace_service_server::NamespaceService for NamespaceServiceImpl
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use super::*;
use generated_types::influxdata::iox::namespace::v1::namespace_service_server::NamespaceService;
use iox_tests::TestCatalog;
@ -133,6 +135,7 @@ mod tests {
catalog.exec(),
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
Arc::new(HashMap::default()),
)
.await
.unwrap(),
@ -168,6 +171,7 @@ mod tests {
catalog.exec(),
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
Arc::new(HashMap::default()),
)
.await
.unwrap(),

View File

@ -11,7 +11,7 @@ use iox_catalog::interface::SoftDeletedRows;
use iox_query::exec::Executor;
use service_common::QueryNamespaceProvider;
use snafu::Snafu;
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
use trace::span::{Span, SpanRecorder};
use tracker::{
AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore,
@ -67,6 +67,9 @@ pub struct QuerierDatabase {
/// Chunk prune metrics.
prune_metrics: Arc<PruneMetrics>,
/// DataFusion config.
datafusion_config: Arc<HashMap<String, String>>,
}
#[async_trait]
@ -100,6 +103,7 @@ impl QuerierDatabase {
exec: Arc<Executor>,
ingester_connection: Option<Arc<dyn IngesterConnection>>,
max_concurrent_queries: usize,
datafusion_config: Arc<HashMap<String, String>>,
) -> Result<Self, Error> {
assert!(
max_concurrent_queries <= Self::MAX_CONCURRENT_QUERIES_MAX,
@ -134,6 +138,7 @@ impl QuerierDatabase {
query_log,
query_execution_semaphore,
prune_metrics,
datafusion_config,
})
}
@ -162,6 +167,7 @@ impl QuerierDatabase {
self.ingester_connection.clone(),
Arc::clone(&self.query_log),
Arc::clone(&self.prune_metrics),
Arc::clone(&self.datafusion_config),
)))
}
@ -219,6 +225,7 @@ mod tests {
catalog.exec(),
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX.saturating_add(1),
Arc::new(HashMap::default()),
)
.await
.unwrap();
@ -243,6 +250,7 @@ mod tests {
catalog.exec(),
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
Arc::new(HashMap::default()),
)
.await
.unwrap();
@ -272,6 +280,7 @@ mod tests {
catalog.exec(),
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
Arc::new(HashMap::default()),
)
.await
.unwrap();

View File

@ -165,7 +165,7 @@ mod tests {
use iox_query::exec::Executor;
use iox_time::{MockProvider, Time};
use object_store::memory::InMemory;
use std::time::Duration;
use std::{collections::HashMap, time::Duration};
use tokio::runtime::Handle;
#[tokio::test]
@ -224,6 +224,7 @@ mod tests {
exec,
Some(create_ingester_connection_for_testing()),
QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX,
Arc::new(HashMap::default()),
)
.await
.unwrap(),

View File

@ -44,6 +44,9 @@ pub struct QuerierNamespace {
/// Query log.
query_log: Arc<QueryLog>,
/// DataFusion config.
datafusion_config: Arc<HashMap<String, String>>,
}
impl QuerierNamespace {
@ -57,6 +60,7 @@ impl QuerierNamespace {
ingester_connection: Option<Arc<dyn IngesterConnection>>,
query_log: Arc<QueryLog>,
prune_metrics: Arc<PruneMetrics>,
datafusion_config: Arc<HashMap<String, String>>,
) -> Self {
let tables: HashMap<_, _> = ns
.tables
@ -87,6 +91,7 @@ impl QuerierNamespace {
exec,
catalog_cache: Arc::clone(chunk_adapter.catalog_cache()),
query_log,
datafusion_config,
}
}
@ -113,6 +118,7 @@ impl QuerierNamespace {
ingester_connection,
query_log,
prune_metrics,
Arc::new(HashMap::default()),
)
}

View File

@ -186,11 +186,17 @@ impl SchemaProvider for UserSchemaProvider {
impl ExecutionContextProvider for QuerierNamespace {
fn new_query_context(&self, span_ctx: Option<SpanContext>) -> IOxSessionContext {
self.exec
let mut cfg = self
.exec
.new_execution_config(ExecutorType::Query)
.with_default_catalog(Arc::new(QuerierCatalogProvider::from_namespace(self)) as _)
.with_span_context(span_ctx)
.build()
.with_span_context(span_ctx);
for (k, v) in self.datafusion_config.as_ref() {
cfg = cfg.with_config_option(k, v);
}
cfg.build()
}
}