feat: add influxdb3_clap_blocks crate with runtime config (#25665)

* feat: add influxdb3_clap_blocks with runtime config

Added a new workspace crate `influxdb3_clap_blocks` which will be a
starting point for adding InfluxDB 3 OSS/Pro specific CLI configuration
that no longer references IOx, and allows for us to trim out unneeded
configurations for the monolithic InfluxDB 3.

Other than changing references from IOX to INFLUXDB3, this makes one
important change: it enables IO on the DataFusion runtime. This, for now,
is an experimental change to see if we can relieve some concurrency
issues that we have been experiencing.

* chore: add observability deps for windows
pull/25673/head
Trevor Hilton 2024-12-16 12:31:55 -08:00 committed by GitHub
parent 238642b1c0
commit 7d92b75731
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 422 additions and 45 deletions

14
Cargo.lock generated
View File

@ -3084,6 +3084,7 @@ dependencies = [
"hyper 0.14.31",
"influxdb3_cache",
"influxdb3_catalog",
"influxdb3_clap_blocks",
"influxdb3_client",
"influxdb3_process",
"influxdb3_server",
@ -3184,6 +3185,19 @@ dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "influxdb3_clap_blocks"
version = "0.1.0"
dependencies = [
"clap",
"futures",
"humantime",
"libc",
"observability_deps",
"paste",
"tokio",
]
[[package]]
name = "influxdb3_client"
version = "0.1.0"

View File

@ -3,7 +3,7 @@
members = [
"influxdb3",
"influxdb3_cache",
"influxdb3_catalog",
"influxdb3_catalog", "influxdb3_clap_blocks",
"influxdb3_client",
"influxdb3_id",
"influxdb3_load_generator",
@ -82,6 +82,7 @@ mockall = { version = "0.13.0" }
num_cpus = "1.16.0"
object_store = "0.11.1"
parking_lot = "0.12.1"
paste = "1.0.15"
parquet = { version = "53.0.0", features = ["object_store"] }
pbjson = "0.6.0"
pbjson-build = "0.6.2"

View File

@ -27,6 +27,7 @@ trogging.workspace = true
influxdb3_cache = { path = "../influxdb3_cache" }
influxdb3_catalog = { path = "../influxdb3_catalog" }
influxdb3_client = { path = "../influxdb3_client" }
influxdb3_clap_blocks = { path = "../influxdb3_clap_blocks" }
influxdb3_process = { path = "../influxdb3_process", default-features = false }
influxdb3_server = { path = "../influxdb3_server" }
influxdb3_wal = { path = "../influxdb3_wal" }

View File

@ -5,7 +5,6 @@ use clap_blocks::{
memory_size::MemorySize,
object_store::{make_object_store, ObjectStoreConfig, ObjectStoreType},
socket_addr::SocketAddr,
tokio::TokioDatafusionConfig,
};
use datafusion_util::config::register_iox_object_store;
use influxdb3_cache::{
@ -13,6 +12,7 @@ use influxdb3_cache::{
meta_cache::MetaCacheProvider,
parquet_cache::create_cached_obj_store_and_oracle,
};
use influxdb3_clap_blocks::tokio::TokioDatafusionConfig;
use influxdb3_process::{
build_malloc_conf, setup_metric_registry, INFLUXDB3_GIT_HASH, INFLUXDB3_VERSION, PROCESS_UUID,
};

View File

@ -11,13 +11,9 @@ clippy::future_not_send
)]
use dotenvy::dotenv;
use influxdb3_clap_blocks::tokio::TokioIoConfig;
use influxdb3_process::VERSION_STRING;
use observability_deps::tracing::warn;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use tokio::runtime::Runtime;
use trogging::{
cli::LoggingConfigBuilderExt,
tracing_subscriber::{prelude::*, Registry},
@ -69,6 +65,9 @@ Examples:
"#
)]
struct Config {
#[clap(flatten)]
runtime_config: TokioIoConfig,
#[clap(subcommand)]
command: Option<Command>,
}
@ -113,7 +112,8 @@ fn main() -> Result<(), std::io::Error> {
let config: Config = clap::Parser::parse();
let tokio_runtime = get_runtime(None)?;
let tokio_runtime = config.runtime_config.builder()?.build()?;
tokio_runtime.block_on(async move {
fn handle_init_logs(r: Result<TroggingGuard, trogging::Error>) -> TroggingGuard {
match r {
@ -183,43 +183,6 @@ fn main() -> Result<(), std::io::Error> {
Ok(())
}
/// Creates the tokio runtime for executing IOx
///
/// if nthreads is none, uses the default scheduler
/// otherwise, creates a scheduler with the number of threads
fn get_runtime(num_threads: Option<usize>) -> Result<Runtime, std::io::Error> {
// NOTE: no log macros will work here!
//
// That means use eprintln!() instead of error!() and so on. The log emitter
// requires a running tokio runtime and is initialised after this function.
use tokio::runtime::Builder;
let kind = std::io::ErrorKind::Other;
match num_threads {
None => Runtime::new(),
Some(num_threads) => {
println!("Setting number of threads to '{num_threads}' per command line request");
let thread_counter = Arc::new(AtomicUsize::new(1));
match num_threads {
0 => {
let msg =
format!("Invalid num-threads: '{num_threads}' must be greater than zero");
Err(std::io::Error::new(kind, msg))
}
1 => Builder::new_current_thread().enable_all().build(),
_ => Builder::new_multi_thread()
.enable_all()
.thread_name_fn(move || {
format!("IOx main {}", thread_counter.fetch_add(1, Ordering::SeqCst))
})
.worker_threads(num_threads)
.build(),
}
}
}
}
/// Source the .env file before initialising the Config struct - this sets
/// any envs in the file, which the Config struct then uses.
///

View File

@ -0,0 +1,23 @@
[package]
name = "influxdb3_clap_blocks"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
# core crate dependencies
observability_deps.workspace = true
# crates.io dependencies
clap.workspace = true
humantime.workspace = true
libc.workspace = true
paste.workspace = true
tokio.workspace = true
[dev-dependencies]
futures.workspace = true
[lints]
workspace = true

View File

@ -0,0 +1,3 @@
//! Configuration options for the `influxdb3` CLI which uses the `clap` crate
pub mod tokio;

View File

@ -0,0 +1,372 @@
//! Config for the tokio main IO and DataFusion runtimes.
use std::{
num::{NonZeroU32, NonZeroUsize},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use paste::paste;
/// Tokio runtime type.
#[derive(Debug, Clone, Copy, Default, clap::ValueEnum)]
pub enum TokioRuntimeType {
/// Current-thread runtime.
CurrentThread,
/// Multi-thread runtime.
#[default]
MultiThread,
/// New, alternative multi-thread runtime.
///
/// Requires `tokio_unstable` compile-time flag.
MultiThreadAlt,
}
#[cfg(unix)]
fn set_current_thread_priority(prio: i32) {
// on linux setpriority sets the current thread's priority
// (as opposed to the current process).
unsafe { libc::setpriority(0, 0, prio) };
}
macro_rules! tokio_rt_config {
(
name = $name:ident ,
num_threads_arg = $num_threads_arg:expr ,
num_threads_env = $num_threads_env:expr ,
default_thread_priority = $default_thread_priority:expr,
) => {
paste! {
#[doc = "CLI config for tokio " $name " runtime."]
#[derive(Debug, Clone, clap::Parser)]
#[allow(missing_copy_implementations)]
pub struct [<Tokio $name:camel Config>] {
#[doc = "Set the maximum number of " $name " runtime threads to use."]
#[doc = ""]
#[doc = "Defaults to the number of logical cores on the system."]
#[clap(
id = concat!(stringify!([<$name:lower>]), "_runtime_num_threads"),
long = $num_threads_arg,
env = $num_threads_env,
action
)]
pub num_threads: Option<NonZeroUsize>,
#[doc = $name " tokio runtime type."]
#[clap(
id = concat!(stringify!([<$name:lower>]), "_runtime_type"),
long = concat!(stringify!([<$name:lower>]), "-runtime-type"),
env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_TYPE"),
default_value_t = TokioRuntimeType::default(),
value_enum,
action
)]
pub runtime_type: TokioRuntimeType,
#[doc = "Disable LIFO slot of " $name " runtime."]
#[doc = ""]
#[doc = "Requires `tokio_unstable` compile-time flag."]
#[clap(
id = concat!(stringify!([<$name:lower>]), "_runtime_disable_lifo"),
long = concat!(stringify!([<$name:lower>]), "-runtime-disable-lifo-slot"),
env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_DISABLE_LIFO_SLOT"),
action
)]
pub disable_lifo: Option<bool>,
#[doc = "Sets the number of scheduler ticks after which the scheduler of the " $name "tokio runtime"]
#[doc = "will poll for external events (timers, I/O, and so on)."]
#[clap(
id = concat!(stringify!([<$name:lower>]), "_runtime_event_interval"),
long = concat!(stringify!([<$name:lower>]), "-runtime-event-interval"),
env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_EVENT_INTERVAL"),
action
)]
pub event_interval: Option<NonZeroU32>,
#[doc = "Sets the number of scheduler ticks after which the scheduler of the " $name " runtime"]
#[doc = "will poll the global task queue."]
#[clap(
id = concat!(stringify!([<$name:lower>]), "_runtime_global_queue_interval"),
long = concat!(stringify!([<$name:lower>]), "-runtime-global-queue-interval"),
env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_GLOBAL_QUEUE_INTERVAL"),
action
)]
pub global_queue_interval: Option<NonZeroU32>,
#[doc = "Specifies the limit for additional threads spawned by the " $name " runtime."]
#[clap(
id = concat!(stringify!([<$name:lower>]), "_runtime_max_blocking_threads"),
long = concat!(stringify!([<$name:lower>]), "-runtime-max-blocking-threads"),
env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_MAX_BLOCKING_THREADS"),
action
)]
pub max_blocking_threads: Option<NonZeroUsize>,
#[doc = "Configures the max number of events to be processed per tick by the tokio " $name " runtime."]
#[clap(
id = concat!(stringify!([<$name:lower>]), "_runtime_max_io_events_per_tick"),
long = concat!(stringify!([<$name:lower>]), "-runtime-max-io-events-per-tick"),
env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_MAX_IO_EVENTS_PER_TICK"),
action
)]
pub max_io_events_per_tick: Option<NonZeroUsize>,
#[doc = "Sets a custom timeout for a thread in the blocking pool of the tokio " $name " runtime."]
#[clap(
id = concat!(stringify!([<$name:lower>]), "_runtime_thread_keep_alive"),
long = concat!(stringify!([<$name:lower>]), "-runtime-thread-keep-alive"),
env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_THREAD_KEEP_ALIVE"),
value_parser = humantime::parse_duration
)]
pub thread_keep_alive: Option<Duration>,
#[doc = "Set thread priority tokio " $name " runtime workers."]
#[clap(
id = concat!(stringify!([<$name:lower>]), "_runtime_thread_priority"),
long = concat!(stringify!([<$name:lower>]), "-runtime-thread-priority"),
env = concat!("INFLUXDB3_", stringify!([<$name:upper>]), "_RUNTIME_THREAD_PRIORITY"),
default_value = $default_thread_priority,
action,
)]
pub thread_priority: Option<i32>,
}
impl [<Tokio $name:camel Config>] {
/// Creates the tokio runtime builder.
pub fn builder(&self) -> Result<::tokio::runtime::Builder, std::io::Error> {
self.builder_with_name(stringify!($name))
}
/// Creates the tokio runtime builder.
pub fn builder_with_name(&self, name: &str) -> Result<::tokio::runtime::Builder, std::io::Error> {
// NOTE: no log macros will work here!
//
// That means use eprintln!() instead of error!() and so on. The log emitter
// requires a running tokio runtime and is initialised after this function.
let mut builder = match self.runtime_type {
TokioRuntimeType::CurrentThread => tokio::runtime::Builder::new_current_thread(),
TokioRuntimeType::MultiThread => tokio::runtime::Builder::new_multi_thread(),
TokioRuntimeType::MultiThreadAlt => {
#[cfg(tokio_unstable)]
{
tokio::runtime::Builder::new_multi_thread_alt()
}
#[cfg(not(tokio_unstable))]
{
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"multi-thread-alt runtime requires `tokio_unstable`",
));
}
}
};
// enable subsystems
// - always enable timers
builder.enable_time();
builder.enable_io();
// set up proper thread names
let thread_counter = Arc::new(AtomicUsize::new(1));
let name = name.to_owned();
builder.thread_name_fn(move || {
format!("InfluxDB 3.0 Tokio {} {}", name, thread_counter.fetch_add(1, Ordering::SeqCst))
});
// worker thread count
let num_threads = match self.num_threads {
None => std::thread::available_parallelism()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
Some(n) => n,
};
builder.worker_threads(num_threads.get());
if self.disable_lifo == Some(true) {
#[cfg(tokio_unstable)]
{
builder.disable_lifo_slot();
}
#[cfg(not(tokio_unstable))]
{
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"disabling LIFO slot requires `tokio_unstable`",
));
}
}
if let Some(x) = self.event_interval {
builder.event_interval(x.get());
}
if let Some(x) = self.global_queue_interval {
builder.global_queue_interval(x.get());
}
if let Some(x) = self.max_blocking_threads {
builder.max_blocking_threads(x.get());
}
if let Some(x) = self.max_io_events_per_tick {
builder.max_io_events_per_tick(x.get());
}
if let Some(x) = self.thread_keep_alive {
builder.thread_keep_alive(x);
}
if let Some(x) = self.thread_priority {
#[cfg(unix)]
{
builder.on_thread_start(move || set_current_thread_priority(x));
}
#[cfg(not(unix))]
{
use observability_deps::tracing::warn;
// use warning instead of hard error to allow for easier default settings
warn!("Setting worker thread priority not supported on this platform");
}
}
Ok(builder)
}
}
}
};
}
tokio_rt_config!(
name = IO,
num_threads_arg = "num-threads",
num_threads_env = "INFLUXDB3_NUM_THREADS",
default_thread_priority = None,
);
tokio_rt_config!(
name = Datafusion,
num_threads_arg = "datafusion-num-threads",
num_threads_env = "INFLUXDB3_DATAFUSION_NUM_THREADS",
default_thread_priority = "10",
);
#[cfg(test)]
mod tests {
use super::*;
use clap::Parser;
use futures::FutureExt;
use std::{ffi::OsString, future::Future};
use tokio::net::TcpListener;
#[cfg(unix)]
#[test]
fn test_thread_priority() {
assert_runtime_thread_property(
TokioDatafusionConfig::parse_from(std::iter::empty::<OsString>())
.builder()
.unwrap(),
|| {
assert_eq!(get_current_thread_priority(), 10);
},
);
}
#[test]
fn test_thread_name() {
assert_runtime_thread_property(
TokioIoConfig::parse_from(std::iter::empty::<OsString>())
.builder()
.unwrap(),
|| {
assert_thread_name("InfluxDB 3.0 Tokio IO");
},
);
assert_runtime_thread_property(
TokioDatafusionConfig::parse_from(std::iter::empty::<OsString>())
.builder()
.unwrap(),
|| {
assert_thread_name("InfluxDB 3.0 Tokio Datafusion");
},
);
assert_runtime_thread_property(
TokioDatafusionConfig::parse_from(std::iter::empty::<OsString>())
.builder_with_name("foo")
.unwrap(),
|| {
assert_thread_name("InfluxDB 3.0 Tokio foo");
},
);
}
#[test]
fn test_io() {
assert_runtime_thread_property_async(
TokioIoConfig::parse_from(std::iter::empty::<OsString>())
.builder()
.unwrap(),
|| async move {
assert!(is_io_enabled().await);
},
);
assert_runtime_thread_property_async(
TokioDatafusionConfig::parse_from(std::iter::empty::<OsString>())
.builder()
.unwrap(),
|| async move {
assert!(is_io_enabled().await);
},
);
}
#[track_caller]
fn assert_runtime_thread_property<F>(builder: tokio::runtime::Builder, f: F)
where
F: FnOnce() + Send + 'static,
{
assert_runtime_thread_property_async(builder, || async move { f() });
}
#[track_caller]
fn assert_runtime_thread_property_async<F, Fut>(mut builder: tokio::runtime::Builder, f: F)
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let rt = builder.build().unwrap();
rt.block_on(async move {
tokio::spawn(async move { f().await }).await.unwrap();
});
}
#[cfg(unix)]
fn get_current_thread_priority() -> i32 {
// on linux setpriority sets the current thread's priority
// (as opposed to the current process).
unsafe { libc::getpriority(0, 0) }
}
#[track_caller]
fn assert_thread_name(prefix: &'static str) {
let thread = std::thread::current();
let tname = thread.name().expect("thread is named");
assert!(tname.starts_with(prefix), "Invalid thread name: {tname}",);
}
async fn is_io_enabled() -> bool {
// the only way (I've found) to test if IO is enabled is to use it and observer if tokio panics
TcpListener::bind("127.0.0.1:0")
.catch_unwind()
.await
.is_ok()
}
}