diff --git a/Cargo.lock b/Cargo.lock index d23d870953..39e5d478b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3084,6 +3084,7 @@ dependencies = [ "hyper 0.14.31", "influxdb3_cache", "influxdb3_catalog", + "influxdb3_clap_blocks", "influxdb3_client", "influxdb3_process", "influxdb3_server", @@ -3184,6 +3185,19 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "influxdb3_clap_blocks" +version = "0.1.0" +dependencies = [ + "clap", + "futures", + "humantime", + "libc", + "observability_deps", + "paste", + "tokio", +] + [[package]] name = "influxdb3_client" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 6ef82b45fe..e91aa1aab2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = [ "influxdb3", "influxdb3_cache", - "influxdb3_catalog", + "influxdb3_catalog", "influxdb3_clap_blocks", "influxdb3_client", "influxdb3_id", "influxdb3_load_generator", @@ -82,6 +82,7 @@ mockall = { version = "0.13.0" } num_cpus = "1.16.0" object_store = "0.11.1" parking_lot = "0.12.1" +paste = "1.0.15" parquet = { version = "53.0.0", features = ["object_store"] } pbjson = "0.6.0" pbjson-build = "0.6.2" diff --git a/influxdb3/Cargo.toml b/influxdb3/Cargo.toml index 068ce90244..457163c454 100644 --- a/influxdb3/Cargo.toml +++ b/influxdb3/Cargo.toml @@ -27,6 +27,7 @@ trogging.workspace = true influxdb3_cache = { path = "../influxdb3_cache" } influxdb3_catalog = { path = "../influxdb3_catalog" } influxdb3_client = { path = "../influxdb3_client" } +influxdb3_clap_blocks = { path = "../influxdb3_clap_blocks" } influxdb3_process = { path = "../influxdb3_process", default-features = false } influxdb3_server = { path = "../influxdb3_server" } influxdb3_wal = { path = "../influxdb3_wal" } diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 0d709accc3..d72b56c35f 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -5,7 +5,6 @@ use clap_blocks::{ memory_size::MemorySize, object_store::{make_object_store, ObjectStoreConfig, ObjectStoreType}, socket_addr::SocketAddr, - tokio::TokioDatafusionConfig, }; use datafusion_util::config::register_iox_object_store; use influxdb3_cache::{ @@ -13,6 +12,7 @@ use influxdb3_cache::{ meta_cache::MetaCacheProvider, parquet_cache::create_cached_obj_store_and_oracle, }; +use influxdb3_clap_blocks::tokio::TokioDatafusionConfig; use influxdb3_process::{ build_malloc_conf, setup_metric_registry, INFLUXDB3_GIT_HASH, INFLUXDB3_VERSION, PROCESS_UUID, }; diff --git a/influxdb3/src/main.rs b/influxdb3/src/main.rs index 97d5f3663c..d96ec6df03 100644 --- a/influxdb3/src/main.rs +++ b/influxdb3/src/main.rs @@ -11,13 +11,9 @@ clippy::future_not_send )] use dotenvy::dotenv; +use influxdb3_clap_blocks::tokio::TokioIoConfig; use influxdb3_process::VERSION_STRING; use observability_deps::tracing::warn; -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; -use tokio::runtime::Runtime; use trogging::{ cli::LoggingConfigBuilderExt, tracing_subscriber::{prelude::*, Registry}, @@ -69,6 +65,9 @@ Examples: "# )] struct Config { + #[clap(flatten)] + runtime_config: TokioIoConfig, + #[clap(subcommand)] command: Option, } @@ -113,7 +112,8 @@ fn main() -> Result<(), std::io::Error> { let config: Config = clap::Parser::parse(); - let tokio_runtime = get_runtime(None)?; + let tokio_runtime = config.runtime_config.builder()?.build()?; + tokio_runtime.block_on(async move { fn handle_init_logs(r: Result) -> TroggingGuard { match r { @@ -183,43 +183,6 @@ fn main() -> Result<(), std::io::Error> { Ok(()) } -/// Creates the tokio runtime for executing IOx -/// -/// if nthreads is none, uses the default scheduler -/// otherwise, creates a scheduler with the number of threads -fn get_runtime(num_threads: Option) -> Result { - // NOTE: no log macros will work here! - // - // That means use eprintln!() instead of error!() and so on. The log emitter - // requires a running tokio runtime and is initialised after this function. - - use tokio::runtime::Builder; - let kind = std::io::ErrorKind::Other; - match num_threads { - None => Runtime::new(), - Some(num_threads) => { - println!("Setting number of threads to '{num_threads}' per command line request"); - - let thread_counter = Arc::new(AtomicUsize::new(1)); - match num_threads { - 0 => { - let msg = - format!("Invalid num-threads: '{num_threads}' must be greater than zero"); - Err(std::io::Error::new(kind, msg)) - } - 1 => Builder::new_current_thread().enable_all().build(), - _ => Builder::new_multi_thread() - .enable_all() - .thread_name_fn(move || { - format!("IOx main {}", thread_counter.fetch_add(1, Ordering::SeqCst)) - }) - .worker_threads(num_threads) - .build(), - } - } - } -} - /// Source the .env file before initialising the Config struct - this sets /// any envs in the file, which the Config struct then uses. /// diff --git a/influxdb3_clap_blocks/Cargo.toml b/influxdb3_clap_blocks/Cargo.toml new file mode 100644 index 0000000000..5bdc24393f --- /dev/null +++ b/influxdb3_clap_blocks/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "influxdb3_clap_blocks" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +# core crate dependencies +observability_deps.workspace = true + +# crates.io dependencies +clap.workspace = true +humantime.workspace = true +libc.workspace = true +paste.workspace = true +tokio.workspace = true + +[dev-dependencies] +futures.workspace = true + +[lints] +workspace = true diff --git a/influxdb3_clap_blocks/src/lib.rs b/influxdb3_clap_blocks/src/lib.rs new file mode 100644 index 0000000000..fcd0114403 --- /dev/null +++ b/influxdb3_clap_blocks/src/lib.rs @@ -0,0 +1,3 @@ +//! Configuration options for the `influxdb3` CLI which uses the `clap` crate + +pub mod tokio; diff --git a/influxdb3_clap_blocks/src/tokio.rs b/influxdb3_clap_blocks/src/tokio.rs new file mode 100644 index 0000000000..39a9d5db3a --- /dev/null +++ b/influxdb3_clap_blocks/src/tokio.rs @@ -0,0 +1,372 @@ +//! Config for the tokio main IO and DataFusion runtimes. + +use std::{ + num::{NonZeroU32, NonZeroUsize}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; + +use paste::paste; + +/// Tokio runtime type. +#[derive(Debug, Clone, Copy, Default, clap::ValueEnum)] +pub enum TokioRuntimeType { + /// Current-thread runtime. + CurrentThread, + + /// Multi-thread runtime. + #[default] + MultiThread, + + /// New, alternative multi-thread runtime. + /// + /// Requires `tokio_unstable` compile-time flag. + MultiThreadAlt, +} + +#[cfg(unix)] +fn set_current_thread_priority(prio: i32) { + // on linux setpriority sets the current thread's priority + // (as opposed to the current process). + unsafe { libc::setpriority(0, 0, prio) }; +} + +macro_rules! tokio_rt_config { + ( + name = $name:ident , + num_threads_arg = $num_threads_arg:expr , + num_threads_env = $num_threads_env:expr , + default_thread_priority = $default_thread_priority:expr, + ) => { + paste! { + #[doc = "CLI config for tokio " $name " runtime."] + #[derive(Debug, Clone, clap::Parser)] + #[allow(missing_copy_implementations)] + pub struct [] { + #[doc = "Set the maximum number of " $name " runtime threads to use."] + #[doc = ""] + #[doc = "Defaults to the number of logical cores on the system."] + #[clap( + id = concat!(stringify!([<$name:lower>]), "_runtime_num_threads"), + long = $num_threads_arg, + env = $num_threads_env, + action + )] + pub num_threads: Option, + + #[doc = $name " tokio runtime type."] + #[clap( + id = concat!(stringify!([<$name:lower>]), "_runtime_type"), + long = concat!(stringify!([<$name:lower>]), "-runtime-type"), + env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_TYPE"), + default_value_t = TokioRuntimeType::default(), + value_enum, + action + )] + pub runtime_type: TokioRuntimeType, + + #[doc = "Disable LIFO slot of " $name " runtime."] + #[doc = ""] + #[doc = "Requires `tokio_unstable` compile-time flag."] + #[clap( + id = concat!(stringify!([<$name:lower>]), "_runtime_disable_lifo"), + long = concat!(stringify!([<$name:lower>]), "-runtime-disable-lifo-slot"), + env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_DISABLE_LIFO_SLOT"), + action + )] + pub disable_lifo: Option, + + #[doc = "Sets the number of scheduler ticks after which the scheduler of the " $name "tokio runtime"] + #[doc = "will poll for external events (timers, I/O, and so on)."] + #[clap( + id = concat!(stringify!([<$name:lower>]), "_runtime_event_interval"), + long = concat!(stringify!([<$name:lower>]), "-runtime-event-interval"), + env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_EVENT_INTERVAL"), + action + )] + pub event_interval: Option, + + #[doc = "Sets the number of scheduler ticks after which the scheduler of the " $name " runtime"] + #[doc = "will poll the global task queue."] + #[clap( + id = concat!(stringify!([<$name:lower>]), "_runtime_global_queue_interval"), + long = concat!(stringify!([<$name:lower>]), "-runtime-global-queue-interval"), + env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_GLOBAL_QUEUE_INTERVAL"), + action + )] + pub global_queue_interval: Option, + + #[doc = "Specifies the limit for additional threads spawned by the " $name " runtime."] + #[clap( + id = concat!(stringify!([<$name:lower>]), "_runtime_max_blocking_threads"), + long = concat!(stringify!([<$name:lower>]), "-runtime-max-blocking-threads"), + env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_MAX_BLOCKING_THREADS"), + action + )] + pub max_blocking_threads: Option, + + #[doc = "Configures the max number of events to be processed per tick by the tokio " $name " runtime."] + #[clap( + id = concat!(stringify!([<$name:lower>]), "_runtime_max_io_events_per_tick"), + long = concat!(stringify!([<$name:lower>]), "-runtime-max-io-events-per-tick"), + env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_MAX_IO_EVENTS_PER_TICK"), + action + )] + pub max_io_events_per_tick: Option, + + #[doc = "Sets a custom timeout for a thread in the blocking pool of the tokio " $name " runtime."] + #[clap( + id = concat!(stringify!([<$name:lower>]), "_runtime_thread_keep_alive"), + long = concat!(stringify!([<$name:lower>]), "-runtime-thread-keep-alive"), + env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_THREAD_KEEP_ALIVE"), + value_parser = humantime::parse_duration + )] + pub thread_keep_alive: Option, + + #[doc = "Set thread priority tokio " $name " runtime workers."] + #[clap( + id = concat!(stringify!([<$name:lower>]), "_runtime_thread_priority"), + long = concat!(stringify!([<$name:lower>]), "-runtime-thread-priority"), + env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_THREAD_PRIORITY"), + default_value = $default_thread_priority, + action, + )] + pub thread_priority: Option, + } + + impl [] { + /// Creates the tokio runtime builder. + pub fn builder(&self) -> Result<::tokio::runtime::Builder, std::io::Error> { + self.builder_with_name(stringify!($name)) + } + + /// Creates the tokio runtime builder. + pub fn builder_with_name(&self, name: &str) -> Result<::tokio::runtime::Builder, std::io::Error> { + // NOTE: no log macros will work here! + // + // That means use eprintln!() instead of error!() and so on. The log emitter + // requires a running tokio runtime and is initialised after this function. + + let mut builder = match self.runtime_type { + TokioRuntimeType::CurrentThread => tokio::runtime::Builder::new_current_thread(), + TokioRuntimeType::MultiThread => tokio::runtime::Builder::new_multi_thread(), + TokioRuntimeType::MultiThreadAlt => { + #[cfg(tokio_unstable)] + { + tokio::runtime::Builder::new_multi_thread_alt() + } + #[cfg(not(tokio_unstable))] + { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "multi-thread-alt runtime requires `tokio_unstable`", + )); + } + } + }; + + // enable subsystems + // - always enable timers + builder.enable_time(); + builder.enable_io(); + + // set up proper thread names + let thread_counter = Arc::new(AtomicUsize::new(1)); + let name = name.to_owned(); + builder.thread_name_fn(move || { + format!("InfluxDB 3.0 Tokio {} {}", name, thread_counter.fetch_add(1, Ordering::SeqCst)) + }); + + // worker thread count + let num_threads = match self.num_threads { + None => std::thread::available_parallelism() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?, + Some(n) => n, + }; + builder.worker_threads(num_threads.get()); + + if self.disable_lifo == Some(true) { + #[cfg(tokio_unstable)] + { + builder.disable_lifo_slot(); + } + #[cfg(not(tokio_unstable))] + { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "disabling LIFO slot requires `tokio_unstable`", + )); + } + } + + if let Some(x) = self.event_interval { + builder.event_interval(x.get()); + } + + if let Some(x) = self.global_queue_interval { + builder.global_queue_interval(x.get()); + } + + if let Some(x) = self.max_blocking_threads { + builder.max_blocking_threads(x.get()); + } + + if let Some(x) = self.max_io_events_per_tick { + builder.max_io_events_per_tick(x.get()); + } + + if let Some(x) = self.thread_keep_alive { + builder.thread_keep_alive(x); + } + + if let Some(x) = self.thread_priority { + #[cfg(unix)] + { + builder.on_thread_start(move || set_current_thread_priority(x)); + } + #[cfg(not(unix))] + { + use observability_deps::tracing::warn; + + // use warning instead of hard error to allow for easier default settings + warn!("Setting worker thread priority not supported on this platform"); + } + } + + Ok(builder) + } + } + } + }; +} + +tokio_rt_config!( + name = IO, + num_threads_arg = "num-threads", + num_threads_env = "INFLUXDB3_NUM_THREADS", + default_thread_priority = None, +); + +tokio_rt_config!( + name = Datafusion, + num_threads_arg = "datafusion-num-threads", + num_threads_env = "INFLUXDB3_DATAFUSION_NUM_THREADS", + default_thread_priority = "10", +); + +#[cfg(test)] +mod tests { + use super::*; + + use clap::Parser; + use futures::FutureExt; + use std::{ffi::OsString, future::Future}; + use tokio::net::TcpListener; + + #[cfg(unix)] + #[test] + fn test_thread_priority() { + assert_runtime_thread_property( + TokioDatafusionConfig::parse_from(std::iter::empty::()) + .builder() + .unwrap(), + || { + assert_eq!(get_current_thread_priority(), 10); + }, + ); + } + + #[test] + fn test_thread_name() { + assert_runtime_thread_property( + TokioIoConfig::parse_from(std::iter::empty::()) + .builder() + .unwrap(), + || { + assert_thread_name("InfluxDB 3.0 Tokio IO"); + }, + ); + assert_runtime_thread_property( + TokioDatafusionConfig::parse_from(std::iter::empty::()) + .builder() + .unwrap(), + || { + assert_thread_name("InfluxDB 3.0 Tokio Datafusion"); + }, + ); + assert_runtime_thread_property( + TokioDatafusionConfig::parse_from(std::iter::empty::()) + .builder_with_name("foo") + .unwrap(), + || { + assert_thread_name("InfluxDB 3.0 Tokio foo"); + }, + ); + } + + #[test] + fn test_io() { + assert_runtime_thread_property_async( + TokioIoConfig::parse_from(std::iter::empty::()) + .builder() + .unwrap(), + || async move { + assert!(is_io_enabled().await); + }, + ); + assert_runtime_thread_property_async( + TokioDatafusionConfig::parse_from(std::iter::empty::()) + .builder() + .unwrap(), + || async move { + assert!(is_io_enabled().await); + }, + ); + } + + #[track_caller] + fn assert_runtime_thread_property(builder: tokio::runtime::Builder, f: F) + where + F: FnOnce() + Send + 'static, + { + assert_runtime_thread_property_async(builder, || async move { f() }); + } + + #[track_caller] + fn assert_runtime_thread_property_async(mut builder: tokio::runtime::Builder, f: F) + where + F: FnOnce() -> Fut + Send + 'static, + Fut: Future + Send + 'static, + { + let rt = builder.build().unwrap(); + rt.block_on(async move { + tokio::spawn(async move { f().await }).await.unwrap(); + }); + } + + #[cfg(unix)] + fn get_current_thread_priority() -> i32 { + // on linux setpriority sets the current thread's priority + // (as opposed to the current process). + unsafe { libc::getpriority(0, 0) } + } + + #[track_caller] + fn assert_thread_name(prefix: &'static str) { + let thread = std::thread::current(); + let tname = thread.name().expect("thread is named"); + + assert!(tname.starts_with(prefix), "Invalid thread name: {tname}",); + } + + async fn is_io_enabled() -> bool { + // the only way (I've found) to test if IO is enabled is to use it and observer if tokio panics + TcpListener::bind("127.0.0.1:0") + .catch_unwind() + .await + .is_ok() + } +}