diff --git a/clap_blocks/src/querier.rs b/clap_blocks/src/querier.rs index 5dbfe92d1b..9c3aec25ca 100644 --- a/clap_blocks/src/querier.rs +++ b/clap_blocks/src/querier.rs @@ -1,7 +1,7 @@ //! Querier-related configs. use crate::ingester_address::IngesterAddress; -use std::num::NonZeroUsize; +use std::{collections::HashMap, num::NonZeroUsize}; /// CLI config for querier configuration #[derive(Debug, Clone, PartialEq, Eq, clap::Parser)] @@ -96,6 +96,16 @@ pub struct QuerierConfig { action )] pub ingester_circuit_breaker_threshold: u64, + + /// DataFusion config. + #[clap( + long = "datafusion-config", + env = "INFLUXDB_IOX_DATAFUSION_CONFIG", + default_value = "", + value_parser = parse_datafusion_config, + action + )] + pub datafusion_config: HashMap, } impl QuerierConfig { @@ -121,6 +131,37 @@ impl QuerierConfig { } } +fn parse_datafusion_config( + s: &str, +) -> Result, Box> { + let s = s.trim(); + if s.is_empty() { + return Ok(HashMap::with_capacity(0)); + } + + let mut out = HashMap::new(); + for part in s.split(',') { + let kv = part.trim().splitn(2, ':').collect::>(); + match kv.as_slice() { + [key, value] => { + let key_owned = key.trim().to_owned(); + let value_owned = value.trim().to_owned(); + let existed = out.insert(key_owned, value_owned).is_some(); + if existed { + return Err(format!("key '{key}' passed multiple times").into()); + } + } + _ => { + return Err( + format!("Invalid key value pair - expected 'KEY:VALUE' got '{s}'").into(), + ); + } + } + } + + Ok(out) +} + #[cfg(test)] mod tests { use super::*; @@ -133,6 +174,7 @@ mod tests { assert_eq!(actual.num_query_threads(), None); assert!(actual.ingester_addresses.is_empty()); + assert!(actual.datafusion_config.is_empty()); } #[test] @@ -183,4 +225,41 @@ mod tests { Invalid: invalid uri character" ); } + + #[test] + fn test_datafusion_config() { + let actual = QuerierConfig::try_parse_from([ + "my_binary", + "--datafusion-config= foo : bar , x:y:z ", + ]) + .unwrap(); + + assert_eq!( + actual.datafusion_config, + HashMap::from([ + (String::from("foo"), String::from("bar")), + (String::from("x"), String::from("y:z")), + ]), + ); + } + + #[test] + fn bad_datafusion_config() { + let actual = QuerierConfig::try_parse_from(["my_binary", "--datafusion-config=foo"]) + .unwrap_err() + .to_string(); + assert_contains!( + actual, + "error: invalid value 'foo' for '--datafusion-config ': Invalid key value pair - expected 'KEY:VALUE' got 'foo'" + ); + + let actual = + QuerierConfig::try_parse_from(["my_binary", "--datafusion-config=foo:bar,baz:1,foo:2"]) + .unwrap_err() + .to_string(); + assert_contains!( + actual, + "error: invalid value 'foo:bar,baz:1,foo:2' for '--datafusion-config ': key 'foo' passed multiple times" + ); + } } diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 4cd619f033..dc21141aed 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -505,6 +505,7 @@ impl Config { max_concurrent_queries: querier_max_concurrent_queries, exec_mem_pool_bytes, ingester_circuit_breaker_threshold: u64::MAX, // never for all-in-one-mode + datafusion_config: Default::default(), }; SpecializedConfig { diff --git a/iox_query/src/config.rs b/iox_query/src/config.rs index 2471ac96b3..1982980ccc 100644 --- a/iox_query/src/config.rs +++ b/iox_query/src/config.rs @@ -1,3 +1,5 @@ +use std::{str::FromStr, time::Duration}; + use datafusion::{common::extensions_options, config::ConfigExtension}; /// IOx-specific config extension prefix. @@ -38,9 +40,55 @@ extensions_options! { /// [`SortExec`]: datafusion::physical_plan::sorts::sort::SortExec /// [`target_partitions`]: datafusion::common::config::ExecutionOptions::target_partitions pub max_parquet_fanout: usize, default = 40 + + /// Cuttoff date for InfluxQL metadata queries. + pub influxql_metadata_cutoff: MetadataCutoff, default = MetadataCutoff::Relative(Duration::from_secs(3600 * 24)) } } impl ConfigExtension for IoxConfigExt { const PREFIX: &'static str = IOX_CONFIG_PREFIX; } + +/// Optional datetime. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MetadataCutoff { + Absolute(chrono::DateTime), + Relative(Duration), +} + +#[derive(Debug)] +pub struct ParseError(String); + +impl std::fmt::Display for ParseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl std::error::Error for ParseError {} + +impl FromStr for MetadataCutoff { + type Err = ParseError; + + fn from_str(s: &str) -> Result { + if let Some(s) = s.strip_prefix('-') { + let delta = u64::from_str(s).map_err(|e| ParseError(e.to_string()))?; + let delta = Duration::from_nanos(delta); + Ok(Self::Relative(delta)) + } else { + let dt = chrono::DateTime::::from_str(s) + .map_err(|e| ParseError(e.to_string()))?; + Ok(Self::Absolute(dt)) + } + } +} + +impl std::fmt::Display for MetadataCutoff { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Relative(delta) => write!(f, "-{}", delta.as_nanos()), + Self::Absolute(dt) => write!(f, "{}", dt), + } + } +} diff --git a/iox_query/src/exec/context.rs b/iox_query/src/exec/context.rs index 5be5f3cab7..9d62f56a5e 100644 --- a/iox_query/src/exec/context.rs +++ b/iox_query/src/exec/context.rs @@ -9,6 +9,7 @@ use super::{ split::StreamSplitNode, }; use crate::{ + config::IoxConfigExt, exec::{ fieldlist::{FieldList, IntoFieldList}, non_null_checker::NonNullCheckerExec, @@ -52,7 +53,7 @@ use datafusion::{ use datafusion_util::config::{iox_session_config, DEFAULT_CATALOG}; use executor::DedicatedExecutor; use futures::{Stream, StreamExt, TryStreamExt}; -use observability_deps::tracing::debug; +use observability_deps::tracing::{debug, warn}; use query_functions::{register_scalar_functions, selectors::register_selector_aggregates}; use std::{convert::TryInto, fmt, num::NonZeroUsize, sync::Arc}; use trace::{ @@ -188,7 +189,11 @@ impl fmt::Debug for IOxSessionConfig { impl IOxSessionConfig { pub(super) fn new(exec: DedicatedExecutor, runtime: Arc) -> Self { - let session_config = iox_session_config(); + let mut session_config = iox_session_config(); + session_config + .options_mut() + .extensions + .insert(IoxConfigExt::default()); Self { exec, @@ -220,6 +225,26 @@ impl IOxSessionConfig { Self { span_ctx, ..self } } + /// Set DataFusion [config option]. + /// + /// May be used to set [IOx-specific] option as well. + /// + /// + /// [config option]: datafusion::common::config::ConfigOptions + /// [IOx-specific]: crate::config::IoxConfigExt + pub fn with_config_option(mut self, key: &str, value: &str) -> Self { + // ignore invalid config + if let Err(e) = self.session_config.options_mut().set(key, value) { + warn!( + key, + value, + %e, + "invalid DataFusion config", + ); + } + self + } + /// Create an ExecutionContext suitable for executing DataFusion plans pub fn build(self) -> IOxSessionContext { let maybe_span = self.span_ctx.child_span("Query Execution"); diff --git a/ioxd_querier/src/lib.rs b/ioxd_querier/src/lib.rs index 6e7af33884..a55729e4cc 100644 --- a/ioxd_querier/src/lib.rs +++ b/ioxd_querier/src/lib.rs @@ -220,6 +220,7 @@ pub async fn create_querier_server_type( args.exec, ingester_connections, args.querier_config.max_concurrent_queries(), + Arc::new(args.querier_config.datafusion_config), ) .await?, ); diff --git a/ioxd_querier/src/rpc/namespace.rs b/ioxd_querier/src/rpc/namespace.rs index 29b8e7d11f..a599a1ae87 100644 --- a/ioxd_querier/src/rpc/namespace.rs +++ b/ioxd_querier/src/rpc/namespace.rs @@ -98,6 +98,8 @@ impl proto::namespace_service_server::NamespaceService for NamespaceServiceImpl #[cfg(test)] mod tests { + use std::collections::HashMap; + use super::*; use generated_types::influxdata::iox::namespace::v1::namespace_service_server::NamespaceService; use iox_tests::TestCatalog; @@ -133,6 +135,7 @@ mod tests { catalog.exec(), Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + Arc::new(HashMap::default()), ) .await .unwrap(), @@ -168,6 +171,7 @@ mod tests { catalog.exec(), Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + Arc::new(HashMap::default()), ) .await .unwrap(), diff --git a/querier/src/database.rs b/querier/src/database.rs index 0874179360..3e3b423902 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -11,7 +11,7 @@ use iox_catalog::interface::SoftDeletedRows; use iox_query::exec::Executor; use service_common::QueryNamespaceProvider; use snafu::Snafu; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use trace::span::{Span, SpanRecorder}; use tracker::{ AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore, @@ -67,6 +67,9 @@ pub struct QuerierDatabase { /// Chunk prune metrics. prune_metrics: Arc, + + /// DataFusion config. + datafusion_config: Arc>, } #[async_trait] @@ -100,6 +103,7 @@ impl QuerierDatabase { exec: Arc, ingester_connection: Option>, max_concurrent_queries: usize, + datafusion_config: Arc>, ) -> Result { assert!( max_concurrent_queries <= Self::MAX_CONCURRENT_QUERIES_MAX, @@ -134,6 +138,7 @@ impl QuerierDatabase { query_log, query_execution_semaphore, prune_metrics, + datafusion_config, }) } @@ -162,6 +167,7 @@ impl QuerierDatabase { self.ingester_connection.clone(), Arc::clone(&self.query_log), Arc::clone(&self.prune_metrics), + Arc::clone(&self.datafusion_config), ))) } @@ -219,6 +225,7 @@ mod tests { catalog.exec(), Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX.saturating_add(1), + Arc::new(HashMap::default()), ) .await .unwrap(); @@ -243,6 +250,7 @@ mod tests { catalog.exec(), Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + Arc::new(HashMap::default()), ) .await .unwrap(); @@ -272,6 +280,7 @@ mod tests { catalog.exec(), Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + Arc::new(HashMap::default()), ) .await .unwrap(); diff --git a/querier/src/handler.rs b/querier/src/handler.rs index d22d9f64a2..ff18e8d559 100644 --- a/querier/src/handler.rs +++ b/querier/src/handler.rs @@ -165,7 +165,7 @@ mod tests { use iox_query::exec::Executor; use iox_time::{MockProvider, Time}; use object_store::memory::InMemory; - use std::time::Duration; + use std::{collections::HashMap, time::Duration}; use tokio::runtime::Handle; #[tokio::test] @@ -224,6 +224,7 @@ mod tests { exec, Some(create_ingester_connection_for_testing()), QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + Arc::new(HashMap::default()), ) .await .unwrap(), diff --git a/querier/src/namespace/mod.rs b/querier/src/namespace/mod.rs index 557bf39eed..224c5561e1 100644 --- a/querier/src/namespace/mod.rs +++ b/querier/src/namespace/mod.rs @@ -44,6 +44,9 @@ pub struct QuerierNamespace { /// Query log. query_log: Arc, + + /// DataFusion config. + datafusion_config: Arc>, } impl QuerierNamespace { @@ -57,6 +60,7 @@ impl QuerierNamespace { ingester_connection: Option>, query_log: Arc, prune_metrics: Arc, + datafusion_config: Arc>, ) -> Self { let tables: HashMap<_, _> = ns .tables @@ -87,6 +91,7 @@ impl QuerierNamespace { exec, catalog_cache: Arc::clone(chunk_adapter.catalog_cache()), query_log, + datafusion_config, } } @@ -113,6 +118,7 @@ impl QuerierNamespace { ingester_connection, query_log, prune_metrics, + Arc::new(HashMap::default()), ) } diff --git a/querier/src/namespace/query_access.rs b/querier/src/namespace/query_access.rs index 528a8c3579..2fbc3cbaa5 100644 --- a/querier/src/namespace/query_access.rs +++ b/querier/src/namespace/query_access.rs @@ -186,11 +186,17 @@ impl SchemaProvider for UserSchemaProvider { impl ExecutionContextProvider for QuerierNamespace { fn new_query_context(&self, span_ctx: Option) -> IOxSessionContext { - self.exec + let mut cfg = self + .exec .new_execution_config(ExecutorType::Query) .with_default_catalog(Arc::new(QuerierCatalogProvider::from_namespace(self)) as _) - .with_span_context(span_ctx) - .build() + .with_span_context(span_ctx); + + for (k, v) in self.datafusion_config.as_ref() { + cfg = cfg.with_config_option(k, v); + } + + cfg.build() } }