374 lines
14 KiB
Rust
374 lines
14 KiB
Rust
//! Config for the tokio main IO and DataFusion runtimes.
|
|
|
|
use std::{
|
|
num::{NonZeroU32, NonZeroUsize},
|
|
sync::{
|
|
Arc,
|
|
atomic::{AtomicUsize, Ordering},
|
|
},
|
|
time::Duration,
|
|
};
|
|
|
|
use paste::paste;
|
|
|
|
/// Tokio runtime type.
|
|
#[derive(Debug, Clone, Copy, Default, clap::ValueEnum)]
|
|
pub enum TokioRuntimeType {
|
|
/// Current-thread runtime.
|
|
CurrentThread,
|
|
|
|
/// Multi-thread runtime.
|
|
#[default]
|
|
MultiThread,
|
|
|
|
/// New, alternative multi-thread runtime.
|
|
///
|
|
/// Requires `tokio_unstable` compile-time flag.
|
|
MultiThreadAlt,
|
|
}
|
|
|
|
#[cfg(unix)]
|
|
fn set_current_thread_priority(prio: i32) {
|
|
// on linux setpriority sets the current thread's priority
|
|
// (as opposed to the current process).
|
|
unsafe { libc::setpriority(0, 0, prio) };
|
|
}
|
|
|
|
macro_rules! tokio_rt_config {
|
|
(
|
|
name = $name:ident ,
|
|
num_threads_arg = $num_threads_arg:expr ,
|
|
num_threads_env = $num_threads_env:expr ,
|
|
default_thread_priority = $default_thread_priority:expr,
|
|
) => {
|
|
paste! {
|
|
#[doc = "CLI config for tokio " $name " runtime."]
|
|
#[derive(Debug, Clone, clap::Parser)]
|
|
#[allow(missing_copy_implementations)]
|
|
pub struct [<Tokio $name:camel Config>] {
|
|
#[doc = "Set the maximum number of " $name " runtime threads to use."]
|
|
#[doc = ""]
|
|
#[doc = "Defaults to the number of logical cores on the system."]
|
|
#[clap(
|
|
id = concat!(stringify!([<$name:lower>]), "_runtime_num_threads"),
|
|
long = $num_threads_arg,
|
|
env = $num_threads_env,
|
|
action
|
|
)]
|
|
pub num_threads: Option<NonZeroUsize>,
|
|
|
|
#[doc = $name " tokio runtime type."]
|
|
#[clap(
|
|
id = concat!(stringify!([<$name:lower>]), "_runtime_type"),
|
|
long = concat!(stringify!([<$name:lower>]), "-runtime-type"),
|
|
env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_TYPE"),
|
|
default_value_t = TokioRuntimeType::default(),
|
|
value_enum,
|
|
action
|
|
)]
|
|
pub runtime_type: TokioRuntimeType,
|
|
|
|
#[doc = "Disable LIFO slot of " $name " runtime."]
|
|
#[doc = ""]
|
|
#[doc = "Requires `tokio_unstable` compile-time flag."]
|
|
#[clap(
|
|
id = concat!(stringify!([<$name:lower>]), "_runtime_disable_lifo"),
|
|
long = concat!(stringify!([<$name:lower>]), "-runtime-disable-lifo-slot"),
|
|
env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_DISABLE_LIFO_SLOT"),
|
|
action
|
|
)]
|
|
pub disable_lifo: Option<bool>,
|
|
|
|
#[doc = "Sets the number of scheduler ticks after which the scheduler of the " $name "tokio runtime"]
|
|
#[doc = "will poll for external events (timers, I/O, and so on)."]
|
|
#[clap(
|
|
id = concat!(stringify!([<$name:lower>]), "_runtime_event_interval"),
|
|
long = concat!(stringify!([<$name:lower>]), "-runtime-event-interval"),
|
|
env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_EVENT_INTERVAL"),
|
|
action
|
|
)]
|
|
pub event_interval: Option<NonZeroU32>,
|
|
|
|
#[doc = "Sets the number of scheduler ticks after which the scheduler of the " $name " runtime"]
|
|
#[doc = "will poll the global task queue."]
|
|
#[clap(
|
|
id = concat!(stringify!([<$name:lower>]), "_runtime_global_queue_interval"),
|
|
long = concat!(stringify!([<$name:lower>]), "-runtime-global-queue-interval"),
|
|
env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_GLOBAL_QUEUE_INTERVAL"),
|
|
action
|
|
)]
|
|
pub global_queue_interval: Option<NonZeroU32>,
|
|
|
|
#[doc = "Specifies the limit for additional threads spawned by the " $name " runtime."]
|
|
#[clap(
|
|
id = concat!(stringify!([<$name:lower>]), "_runtime_max_blocking_threads"),
|
|
long = concat!(stringify!([<$name:lower>]), "-runtime-max-blocking-threads"),
|
|
env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_MAX_BLOCKING_THREADS"),
|
|
action
|
|
)]
|
|
pub max_blocking_threads: Option<NonZeroUsize>,
|
|
|
|
#[doc = "Configures the max number of events to be processed per tick by the tokio " $name " runtime."]
|
|
#[clap(
|
|
id = concat!(stringify!([<$name:lower>]), "_runtime_max_io_events_per_tick"),
|
|
long = concat!(stringify!([<$name:lower>]), "-runtime-max-io-events-per-tick"),
|
|
env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_MAX_IO_EVENTS_PER_TICK"),
|
|
action
|
|
)]
|
|
pub max_io_events_per_tick: Option<NonZeroUsize>,
|
|
|
|
#[doc = "Sets a custom timeout for a thread in the blocking pool of the tokio " $name " runtime."]
|
|
#[clap(
|
|
id = concat!(stringify!([<$name:lower>]), "_runtime_thread_keep_alive"),
|
|
long = concat!(stringify!([<$name:lower>]), "-runtime-thread-keep-alive"),
|
|
env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_THREAD_KEEP_ALIVE"),
|
|
value_parser = humantime::parse_duration
|
|
)]
|
|
pub thread_keep_alive: Option<Duration>,
|
|
|
|
#[doc = "Set thread priority tokio " $name " runtime workers."]
|
|
#[clap(
|
|
id = concat!(stringify!([<$name:lower>]), "_runtime_thread_priority"),
|
|
long = concat!(stringify!([<$name:lower>]), "-runtime-thread-priority"),
|
|
env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_THREAD_PRIORITY"),
|
|
default_value = $default_thread_priority,
|
|
action,
|
|
)]
|
|
pub thread_priority: Option<i32>,
|
|
}
|
|
|
|
impl [<Tokio $name:camel Config>] {
|
|
/// Creates the tokio runtime builder.
|
|
pub fn builder(&self) -> Result<::tokio::runtime::Builder, std::io::Error> {
|
|
self.builder_with_name(stringify!($name))
|
|
}
|
|
|
|
/// Creates the tokio runtime builder.
|
|
pub fn builder_with_name(&self, name: &str) -> Result<::tokio::runtime::Builder, std::io::Error> {
|
|
// NOTE: no log macros will work here!
|
|
//
|
|
// That means use eprintln!() instead of error!() and so on. The log emitter
|
|
// requires a running tokio runtime and is initialised after this function.
|
|
|
|
let mut builder = match self.runtime_type {
|
|
TokioRuntimeType::CurrentThread => tokio::runtime::Builder::new_current_thread(),
|
|
TokioRuntimeType::MultiThread => tokio::runtime::Builder::new_multi_thread(),
|
|
TokioRuntimeType::MultiThreadAlt => {
|
|
#[cfg(tokio_unstable)]
|
|
{
|
|
tokio::runtime::Builder::new_multi_thread_alt()
|
|
}
|
|
#[cfg(not(tokio_unstable))]
|
|
{
|
|
return Err(std::io::Error::new(
|
|
std::io::ErrorKind::Other,
|
|
"multi-thread-alt runtime requires `tokio_unstable`",
|
|
));
|
|
}
|
|
}
|
|
};
|
|
|
|
// enable subsystems
|
|
// - always enable timers
|
|
builder.enable_time();
|
|
builder.enable_io();
|
|
|
|
// set up proper thread names
|
|
let thread_counter = Arc::new(AtomicUsize::new(1));
|
|
let name = name.to_owned();
|
|
builder.thread_name_fn(move || {
|
|
format!("InfluxDB 3 Core Tokio {} {}", name, thread_counter.fetch_add(1, Ordering::SeqCst))
|
|
});
|
|
|
|
// worker thread count
|
|
let num_threads = match self.num_threads {
|
|
None => std::thread::available_parallelism()
|
|
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
|
|
Some(n) => n,
|
|
};
|
|
builder.worker_threads(num_threads.get());
|
|
|
|
if self.disable_lifo == Some(true) {
|
|
#[cfg(tokio_unstable)]
|
|
{
|
|
builder.disable_lifo_slot();
|
|
}
|
|
#[cfg(not(tokio_unstable))]
|
|
{
|
|
return Err(std::io::Error::new(
|
|
std::io::ErrorKind::Other,
|
|
"disabling LIFO slot requires `tokio_unstable`",
|
|
));
|
|
}
|
|
}
|
|
|
|
if let Some(x) = self.event_interval {
|
|
builder.event_interval(x.get());
|
|
}
|
|
|
|
if let Some(x) = self.global_queue_interval {
|
|
builder.global_queue_interval(x.get());
|
|
}
|
|
|
|
if let Some(x) = self.max_blocking_threads {
|
|
builder.max_blocking_threads(x.get());
|
|
}
|
|
|
|
if let Some(x) = self.max_io_events_per_tick {
|
|
builder.max_io_events_per_tick(x.get());
|
|
}
|
|
|
|
if let Some(x) = self.thread_keep_alive {
|
|
builder.thread_keep_alive(x);
|
|
}
|
|
|
|
#[allow(unused)]
|
|
if let Some(x) = self.thread_priority {
|
|
#[cfg(unix)]
|
|
{
|
|
builder.on_thread_start(move || set_current_thread_priority(x));
|
|
}
|
|
#[cfg(not(unix))]
|
|
{
|
|
use observability_deps::tracing::warn;
|
|
|
|
// use warning instead of hard error to allow for easier default settings
|
|
warn!("Setting worker thread priority not supported on this platform");
|
|
}
|
|
}
|
|
|
|
Ok(builder)
|
|
}
|
|
}
|
|
}
|
|
};
|
|
}
|
|
|
|
tokio_rt_config!(
|
|
name = IO,
|
|
num_threads_arg = "num-threads",
|
|
num_threads_env = "INFLUXDB3_NUM_THREADS",
|
|
default_thread_priority = None,
|
|
);
|
|
|
|
tokio_rt_config!(
|
|
name = Datafusion,
|
|
num_threads_arg = "datafusion-num-threads",
|
|
num_threads_env = "INFLUXDB3_DATAFUSION_NUM_THREADS",
|
|
default_thread_priority = "10",
|
|
);
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
use clap::Parser;
|
|
use futures::FutureExt;
|
|
use std::{ffi::OsString, future::Future};
|
|
use tokio::net::TcpListener;
|
|
|
|
#[cfg(unix)]
|
|
#[test]
|
|
fn test_thread_priority() {
|
|
assert_runtime_thread_property(
|
|
TokioDatafusionConfig::parse_from(std::iter::empty::<OsString>())
|
|
.builder()
|
|
.unwrap(),
|
|
|| {
|
|
assert_eq!(get_current_thread_priority(), 10);
|
|
},
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn test_thread_name() {
|
|
assert_runtime_thread_property(
|
|
TokioIoConfig::parse_from(std::iter::empty::<OsString>())
|
|
.builder()
|
|
.unwrap(),
|
|
|| {
|
|
assert_thread_name("InfluxDB 3 Core Tokio IO");
|
|
},
|
|
);
|
|
assert_runtime_thread_property(
|
|
TokioDatafusionConfig::parse_from(std::iter::empty::<OsString>())
|
|
.builder()
|
|
.unwrap(),
|
|
|| {
|
|
assert_thread_name("InfluxDB 3 Core Tokio Datafusion");
|
|
},
|
|
);
|
|
assert_runtime_thread_property(
|
|
TokioDatafusionConfig::parse_from(std::iter::empty::<OsString>())
|
|
.builder_with_name("foo")
|
|
.unwrap(),
|
|
|| {
|
|
assert_thread_name("InfluxDB 3 Core Tokio foo");
|
|
},
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn test_io() {
|
|
assert_runtime_thread_property_async(
|
|
TokioIoConfig::parse_from(std::iter::empty::<OsString>())
|
|
.builder()
|
|
.unwrap(),
|
|
|| async move {
|
|
assert!(is_io_enabled().await);
|
|
},
|
|
);
|
|
assert_runtime_thread_property_async(
|
|
TokioDatafusionConfig::parse_from(std::iter::empty::<OsString>())
|
|
.builder()
|
|
.unwrap(),
|
|
|| async move {
|
|
assert!(is_io_enabled().await);
|
|
},
|
|
);
|
|
}
|
|
|
|
#[track_caller]
|
|
fn assert_runtime_thread_property<F>(builder: tokio::runtime::Builder, f: F)
|
|
where
|
|
F: FnOnce() + Send + 'static,
|
|
{
|
|
assert_runtime_thread_property_async(builder, || async move { f() });
|
|
}
|
|
|
|
#[track_caller]
|
|
fn assert_runtime_thread_property_async<F, Fut>(mut builder: tokio::runtime::Builder, f: F)
|
|
where
|
|
F: FnOnce() -> Fut + Send + 'static,
|
|
Fut: Future<Output = ()> + Send + 'static,
|
|
{
|
|
let rt = builder.build().unwrap();
|
|
rt.block_on(async move {
|
|
tokio::spawn(async move { f().await }).await.unwrap();
|
|
});
|
|
}
|
|
|
|
#[cfg(unix)]
|
|
fn get_current_thread_priority() -> i32 {
|
|
// on linux setpriority sets the current thread's priority
|
|
// (as opposed to the current process).
|
|
unsafe { libc::getpriority(0, 0) }
|
|
}
|
|
|
|
#[track_caller]
|
|
fn assert_thread_name(prefix: &'static str) {
|
|
let thread = std::thread::current();
|
|
let tname = thread.name().expect("thread is named");
|
|
|
|
assert!(tname.starts_with(prefix), "Invalid thread name: {tname}",);
|
|
}
|
|
|
|
async fn is_io_enabled() -> bool {
|
|
// the only way (I've found) to test if IO is enabled is to use it and observer if tokio panics
|
|
TcpListener::bind("127.0.0.1:0")
|
|
.catch_unwind()
|
|
.await
|
|
.is_ok()
|
|
}
|
|
}
|