feat: accept relative memory size for memory pools (#8503)
Closes https://github.com/influxdata/idpe/issues/18006. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
3612b1c482
commit
7e2f85a24e
|
@ -891,6 +891,7 @@ dependencies = [
|
|||
"object_store",
|
||||
"observability_deps",
|
||||
"snafu",
|
||||
"sysinfo",
|
||||
"tempfile",
|
||||
"test_helpers",
|
||||
"trace_exporters",
|
||||
|
|
|
@ -15,6 +15,7 @@ metric = { path = "../metric" }
|
|||
object_store = { workspace = true }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
snafu = "0.7"
|
||||
sysinfo = "0.29.8"
|
||||
trace_exporters = { path = "../trace_exporters" }
|
||||
trogging = { path = "../trogging", default-features = false, features = ["clap"] }
|
||||
uuid = { version = "1", features = ["v4"] }
|
||||
|
|
|
@ -2,6 +2,8 @@
|
|||
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use crate::memory_size::MemorySize;
|
||||
|
||||
use super::compactor_scheduler::CompactorSchedulerConfig;
|
||||
|
||||
/// CLI config for compactor
|
||||
|
@ -62,13 +64,15 @@ pub struct CompactorConfig {
|
|||
/// If compaction plans attempt to allocate more than this many
|
||||
/// bytes during execution, they will error with
|
||||
/// "ResourcesExhausted".
|
||||
///
|
||||
/// Can be given as absolute value or in percentage of the total available memory (e.g. `10%`).
|
||||
#[clap(
|
||||
long = "exec-mem-pool-bytes",
|
||||
env = "INFLUXDB_IOX_EXEC_MEM_POOL_BYTES",
|
||||
default_value = "8589934592", // 8GB
|
||||
action
|
||||
)]
|
||||
pub exec_mem_pool_bytes: usize,
|
||||
pub exec_mem_pool_bytes: MemorySize,
|
||||
|
||||
/// Desired max size of compacted parquet files.
|
||||
///
|
||||
|
|
|
@ -25,6 +25,7 @@ pub mod garbage_collector;
|
|||
pub mod gossip;
|
||||
pub mod ingester;
|
||||
pub mod ingester_address;
|
||||
pub mod memory_size;
|
||||
pub mod object_store;
|
||||
pub mod querier;
|
||||
pub mod router;
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
//! Helper types to express memory size.
|
||||
|
||||
use std::{str::FromStr, sync::OnceLock};
|
||||
|
||||
use sysinfo::{RefreshKind, System, SystemExt};
|
||||
|
||||
/// Memory size.
|
||||
///
|
||||
/// # Parsing
|
||||
/// This can be parsed from strings in one of the following formats:
|
||||
///
|
||||
/// - **absolute:** just use a non-negative number to specify the absolute bytes, e.g. `1024`
|
||||
/// - **relative:** use percentage between 0 and 100 (both inclusive) to specify a relative amount of the totally
|
||||
/// available memory size, e.g. `50%`
|
||||
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct MemorySize(usize);
|
||||
|
||||
impl MemorySize {
|
||||
/// Number of bytes.
|
||||
pub fn bytes(&self) -> usize {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for MemorySize {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for MemorySize {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for MemorySize {
|
||||
type Err = String;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s.strip_suffix('%') {
|
||||
Some(s) => {
|
||||
let percentage = u64::from_str(s).map_err(|e| e.to_string())?;
|
||||
if percentage > 100 {
|
||||
return Err(format!(
|
||||
"relative memory size must be in [0, 100] but is {percentage}"
|
||||
));
|
||||
}
|
||||
let total = *TOTAL_MEM_BYTES.get_or_init(|| {
|
||||
let sys = System::new_with_specifics(RefreshKind::new().with_memory());
|
||||
sys.total_memory() as usize
|
||||
});
|
||||
let bytes = (percentage as f64 / 100f64 * total as f64).round() as usize;
|
||||
Ok(Self(bytes))
|
||||
}
|
||||
None => {
|
||||
let bytes = usize::from_str(s).map_err(|e| e.to_string())?;
|
||||
Ok(Self(bytes))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Totally available memory size in bytes.
|
||||
///
|
||||
/// Keep this in a global state so that we only need to inspect the system once during IOx startup.
|
||||
static TOTAL_MEM_BYTES: OnceLock<usize> = OnceLock::new();
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_parse() {
|
||||
assert_ok("0", 0);
|
||||
assert_ok("1", 1);
|
||||
assert_ok("1024", 1024);
|
||||
assert_ok("0%", 0);
|
||||
|
||||
assert_gt_zero("50%");
|
||||
|
||||
assert_err("-1", "invalid digit found in string");
|
||||
assert_err("foo", "invalid digit found in string");
|
||||
assert_err("-1%", "invalid digit found in string");
|
||||
assert_err(
|
||||
"101%",
|
||||
"relative memory size must be in [0, 100] but is 101",
|
||||
);
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn assert_ok(s: &'static str, expected: usize) {
|
||||
let parsed: MemorySize = s.parse().unwrap();
|
||||
assert_eq!(parsed.bytes(), expected);
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn assert_gt_zero(s: &'static str) {
|
||||
let parsed: MemorySize = s.parse().unwrap();
|
||||
assert!(parsed.bytes() > 0);
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn assert_err(s: &'static str, expected: &'static str) {
|
||||
let err = MemorySize::from_str(s).unwrap_err();
|
||||
assert_eq!(err, expected);
|
||||
}
|
||||
}
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
use crate::{
|
||||
ingester_address::IngesterAddress,
|
||||
memory_size::MemorySize,
|
||||
single_tenant::{CONFIG_AUTHZ_ENV_NAME, CONFIG_AUTHZ_FLAG},
|
||||
};
|
||||
use std::{collections::HashMap, num::NonZeroUsize};
|
||||
|
@ -27,13 +28,15 @@ pub struct QuerierConfig {
|
|||
///
|
||||
/// If queries attempt to allocate more than this many bytes
|
||||
/// during execution, they will error with "ResourcesExhausted".
|
||||
///
|
||||
/// Can be given as absolute value or in percentage of the total available memory (e.g. `10%`).
|
||||
#[clap(
|
||||
long = "exec-mem-pool-bytes",
|
||||
env = "INFLUXDB_IOX_EXEC_MEM_POOL_BYTES",
|
||||
default_value = "8589934592", // 8GB
|
||||
action
|
||||
)]
|
||||
pub exec_mem_pool_bytes: usize,
|
||||
pub exec_mem_pool_bytes: MemorySize,
|
||||
|
||||
/// gRPC address for the router to talk with the ingesters. For
|
||||
/// example:
|
||||
|
@ -55,22 +58,26 @@ pub struct QuerierConfig {
|
|||
pub ingester_addresses: Vec<IngesterAddress>,
|
||||
|
||||
/// Size of the RAM cache used to store catalog metadata information in bytes.
|
||||
///
|
||||
/// Can be given as absolute value or in percentage of the total available memory (e.g. `10%`).
|
||||
#[clap(
|
||||
long = "ram-pool-metadata-bytes",
|
||||
env = "INFLUXDB_IOX_RAM_POOL_METADATA_BYTES",
|
||||
default_value = "134217728", // 128MB
|
||||
action
|
||||
)]
|
||||
pub ram_pool_metadata_bytes: usize,
|
||||
pub ram_pool_metadata_bytes: MemorySize,
|
||||
|
||||
/// Size of the RAM cache used to store data in bytes.
|
||||
///
|
||||
/// Can be given as absolute value or in percentage of the total available memory (e.g. `10%`).
|
||||
#[clap(
|
||||
long = "ram-pool-data-bytes",
|
||||
env = "INFLUXDB_IOX_RAM_POOL_DATA_BYTES",
|
||||
default_value = "1073741824", // 1GB
|
||||
action
|
||||
)]
|
||||
pub ram_pool_data_bytes: usize,
|
||||
pub ram_pool_data_bytes: MemorySize,
|
||||
|
||||
/// Limit the number of concurrent queries.
|
||||
#[clap(
|
||||
|
@ -115,29 +122,6 @@ pub struct QuerierConfig {
|
|||
pub datafusion_config: HashMap<String, String>,
|
||||
}
|
||||
|
||||
impl QuerierConfig {
|
||||
/// Get the querier config's num query threads.
|
||||
#[must_use]
|
||||
pub fn num_query_threads(&self) -> Option<NonZeroUsize> {
|
||||
self.num_query_threads
|
||||
}
|
||||
|
||||
/// Size of the RAM cache pool for metadata in bytes.
|
||||
pub fn ram_pool_metadata_bytes(&self) -> usize {
|
||||
self.ram_pool_metadata_bytes
|
||||
}
|
||||
|
||||
/// Size of the RAM cache pool for payload in bytes.
|
||||
pub fn ram_pool_data_bytes(&self) -> usize {
|
||||
self.ram_pool_data_bytes
|
||||
}
|
||||
|
||||
/// Number of queries allowed to run concurrently
|
||||
pub fn max_concurrent_queries(&self) -> usize {
|
||||
self.max_concurrent_queries
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_datafusion_config(
|
||||
s: &str,
|
||||
) -> Result<HashMap<String, String>, Box<dyn std::error::Error + Send + Sync + 'static>> {
|
||||
|
@ -179,7 +163,7 @@ mod tests {
|
|||
fn test_default() {
|
||||
let actual = QuerierConfig::try_parse_from(["my_binary"]).unwrap();
|
||||
|
||||
assert_eq!(actual.num_query_threads(), None);
|
||||
assert_eq!(actual.num_query_threads, None);
|
||||
assert!(actual.ingester_addresses.is_empty());
|
||||
assert!(actual.datafusion_config.is_empty());
|
||||
}
|
||||
|
@ -190,7 +174,7 @@ mod tests {
|
|||
QuerierConfig::try_parse_from(["my_binary", "--num-query-threads", "42"]).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
actual.num_query_threads(),
|
||||
actual.num_query_threads,
|
||||
Some(NonZeroUsize::new(42).unwrap())
|
||||
);
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ use clap_blocks::{
|
|||
gossip::GossipConfig,
|
||||
ingester::IngesterConfig,
|
||||
ingester_address::IngesterAddress,
|
||||
memory_size::MemorySize,
|
||||
object_store::{make_object_store, ObjectStoreConfig},
|
||||
querier::QuerierConfig,
|
||||
router::RouterConfig,
|
||||
|
@ -319,22 +320,26 @@ pub struct Config {
|
|||
compactor_scheduler_config: CompactorSchedulerConfig,
|
||||
|
||||
/// Size of the querier RAM cache used to store catalog metadata information in bytes.
|
||||
///
|
||||
/// Can be given as absolute value or in percentage of the total available memory (e.g. `10%`).
|
||||
#[clap(
|
||||
long = "querier-ram-pool-metadata-bytes",
|
||||
env = "INFLUXDB_IOX_QUERIER_RAM_POOL_METADATA_BYTES",
|
||||
default_value = "134217728", // 128MB
|
||||
action
|
||||
)]
|
||||
pub querier_ram_pool_metadata_bytes: usize,
|
||||
pub querier_ram_pool_metadata_bytes: MemorySize,
|
||||
|
||||
/// Size of the querier RAM cache used to store data in bytes.
|
||||
///
|
||||
/// Can be given as absolute value or in percentage of the total available memory (e.g. `10%`).
|
||||
#[clap(
|
||||
long = "querier-ram-pool-data-bytes",
|
||||
env = "INFLUXDB_IOX_QUERIER_RAM_POOL_DATA_BYTES",
|
||||
default_value = "1073741824", // 1GB
|
||||
action
|
||||
)]
|
||||
pub querier_ram_pool_data_bytes: usize,
|
||||
pub querier_ram_pool_data_bytes: MemorySize,
|
||||
|
||||
/// Limit the number of concurrent queries.
|
||||
#[clap(
|
||||
|
@ -346,13 +351,15 @@ pub struct Config {
|
|||
pub querier_max_concurrent_queries: usize,
|
||||
|
||||
/// Size of memory pool used during query exec, in bytes.
|
||||
///
|
||||
/// Can be given as absolute value or in percentage of the total available memory (e.g. `10%`).
|
||||
#[clap(
|
||||
long = "exec-mem-pool-bytes",
|
||||
env = "INFLUXDB_IOX_EXEC_MEM_POOL_BYTES",
|
||||
default_value = "8589934592", // 8GB
|
||||
action
|
||||
)]
|
||||
pub exec_mem_pool_bytes: usize,
|
||||
pub exec_mem_pool_bytes: MemorySize,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
|
@ -629,7 +636,7 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
.map(|store| (store.id(), Arc::clone(store.object_store())))
|
||||
.collect(),
|
||||
metric_registry: Arc::clone(&metrics),
|
||||
mem_pool_size: querier_config.exec_mem_pool_bytes,
|
||||
mem_pool_size: querier_config.exec_mem_pool_bytes.bytes(),
|
||||
}));
|
||||
|
||||
info!("starting router");
|
||||
|
|
|
@ -113,7 +113,7 @@ pub async fn command(config: Config) -> Result<(), Error> {
|
|||
.map(|store| (store.id(), Arc::clone(store.object_store())))
|
||||
.collect(),
|
||||
metric_registry: Arc::clone(&metric_registry),
|
||||
mem_pool_size: config.compactor_config.exec_mem_pool_bytes,
|
||||
mem_pool_size: config.compactor_config.exec_mem_pool_bytes.bytes(),
|
||||
}));
|
||||
let time_provider = Arc::new(SystemProvider::new());
|
||||
|
||||
|
|
|
@ -92,7 +92,7 @@ pub async fn command(config: Config) -> Result<(), Error> {
|
|||
|
||||
let time_provider = Arc::new(SystemProvider::new());
|
||||
|
||||
let num_query_threads = config.querier_config.num_query_threads();
|
||||
let num_query_threads = config.querier_config.num_query_threads;
|
||||
let num_threads = num_query_threads.unwrap_or_else(|| {
|
||||
NonZeroUsize::new(num_cpus::get()).unwrap_or_else(|| NonZeroUsize::new(1).unwrap())
|
||||
});
|
||||
|
@ -103,7 +103,7 @@ pub async fn command(config: Config) -> Result<(), Error> {
|
|||
|
||||
let exec = Arc::new(Executor::new(
|
||||
num_threads,
|
||||
config.querier_config.exec_mem_pool_bytes,
|
||||
config.querier_config.exec_mem_pool_bytes.bytes(),
|
||||
Arc::clone(&metric_registry),
|
||||
));
|
||||
|
||||
|
|
|
@ -204,8 +204,8 @@ pub async fn create_querier_server_type(
|
|||
args.time_provider,
|
||||
Arc::clone(&args.metric_registry),
|
||||
Arc::clone(&args.object_store),
|
||||
args.querier_config.ram_pool_metadata_bytes(),
|
||||
args.querier_config.ram_pool_data_bytes(),
|
||||
args.querier_config.ram_pool_metadata_bytes.bytes(),
|
||||
args.querier_config.ram_pool_data_bytes.bytes(),
|
||||
&Handle::current(),
|
||||
));
|
||||
|
||||
|
@ -261,7 +261,7 @@ pub async fn create_querier_server_type(
|
|||
Arc::clone(&args.metric_registry),
|
||||
args.exec,
|
||||
ingester_connections,
|
||||
args.querier_config.max_concurrent_queries(),
|
||||
args.querier_config.max_concurrent_queries,
|
||||
Arc::new(args.querier_config.datafusion_config),
|
||||
)
|
||||
.await?,
|
||||
|
|
Loading…
Reference in New Issue