feat(iox): Implement max_active_compactions_cpu_fraction
parent
ad68817513
commit
fe7f65bfa7
|
@ -146,9 +146,7 @@ pub struct LifecycleRules {
|
|||
pub worker_backoff_millis: NonZeroU64,
|
||||
|
||||
/// The maximum number of permitted concurrently executing compactions.
|
||||
/// It is not currently possible to set a limit that disables compactions
|
||||
/// entirely, nor is it possible to set an "unlimited" value.
|
||||
pub max_active_compactions: NonZeroU32,
|
||||
pub max_active_compactions: MaxActiveCompactions,
|
||||
|
||||
/// After how many transactions should IOx write a new checkpoint?
|
||||
pub catalog_transactions_until_checkpoint: NonZeroU64,
|
||||
|
@ -173,6 +171,52 @@ pub struct LifecycleRules {
|
|||
pub parquet_cache_limit: Option<NonZeroU64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
|
||||
pub enum MaxActiveCompactions {
|
||||
/// The maximum number of permitted concurrently executing compactions.
|
||||
/// It is not currently possible to set a limit that disables compactions
|
||||
/// entirely, nor is it possible to set an "unlimited" value.
|
||||
MaxActiveCompactions(NonZeroU32),
|
||||
|
||||
// The maximum number of concurrent active compactions that can run
|
||||
// expressed as a fraction of the available cpus (rounded to the next smallest non-zero integer).
|
||||
MaxActiveCompactionsCpuFraction {
|
||||
fraction: f32,
|
||||
effective: NonZeroU32,
|
||||
},
|
||||
}
|
||||
|
||||
impl MaxActiveCompactions {
|
||||
pub fn new(fraction: f32) -> Self {
|
||||
let cpus = num_cpus::get() as f32 * fraction;
|
||||
let effective = (cpus as u32).saturating_sub(1) + 1;
|
||||
let effective = NonZeroU32::new(effective).unwrap();
|
||||
Self::MaxActiveCompactionsCpuFraction {
|
||||
fraction,
|
||||
effective,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(&self) -> u32 {
|
||||
match self {
|
||||
Self::MaxActiveCompactions(effective) => effective,
|
||||
Self::MaxActiveCompactionsCpuFraction { effective, .. } => effective,
|
||||
}
|
||||
.get()
|
||||
}
|
||||
}
|
||||
|
||||
// Defaults to number of CPUs.
|
||||
impl Default for MaxActiveCompactions {
|
||||
fn default() -> Self {
|
||||
Self::new(1.0)
|
||||
}
|
||||
}
|
||||
|
||||
// Required because database rules must be Eq but cannot derive Eq for Self
|
||||
// since f32 is not Eq.
|
||||
impl Eq for MaxActiveCompactions {}
|
||||
|
||||
impl LifecycleRules {
|
||||
/// The max timestamp skew across concurrent writers before persisted chunks might overlap
|
||||
pub fn late_arrive_window(&self) -> Duration {
|
||||
|
@ -189,7 +233,7 @@ impl Default for LifecycleRules {
|
|||
persist: false,
|
||||
immutable: false,
|
||||
worker_backoff_millis: NonZeroU64::new(DEFAULT_WORKER_BACKOFF_MILLIS).unwrap(),
|
||||
max_active_compactions: NonZeroU32::new(num_cpus::get() as u32).unwrap(), // defaults to number of CPU threads
|
||||
max_active_compactions: Default::default(),
|
||||
catalog_transactions_until_checkpoint: NonZeroU64::new(
|
||||
DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT,
|
||||
)
|
||||
|
@ -734,4 +778,20 @@ mod tests {
|
|||
|
||||
ensure(DatabaseRules::new(DatabaseName::new("bananas").unwrap()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_max_active_compactions_cpu_fraction() {
|
||||
let n = MaxActiveCompactions::new(1.0);
|
||||
let cpus = n.get();
|
||||
|
||||
let n = MaxActiveCompactions::new(0.5);
|
||||
let half_cpus = n.get();
|
||||
|
||||
assert_eq!(half_cpus, cpus / 2);
|
||||
|
||||
let n = MaxActiveCompactions::new(0.0);
|
||||
let non_zero = n.get();
|
||||
|
||||
assert_eq!(non_zero, 1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,11 +78,14 @@ message LifecycleRules {
|
|||
// Maximum number of rows to buffer in a MUB chunk before compacting it
|
||||
uint64 mub_row_threshold = 15;
|
||||
|
||||
// The maximum number of concurrent active compactions that can run.
|
||||
//
|
||||
// If 0, compactions are limited to the default number.
|
||||
// See data_types::database_rules::DEFAULT_MAX_ACTIVE_COMPACTIONS
|
||||
uint32 max_active_compactions = 16;
|
||||
oneof max_active_compactions_cfg {
|
||||
// The maximum number of concurrent active compactions that can run.
|
||||
uint32 max_active_compactions = 16;
|
||||
|
||||
// The maximum number of concurrent active compactions that can run
|
||||
// expressed as a fraction of the available cpus (rounded to the next smallest non-zero integer).
|
||||
float max_active_compactions_cpu_fraction = 18;
|
||||
}
|
||||
|
||||
// Use up to this amount of space in bytes for caching Parquet files.
|
||||
// A value of 0 disables Parquet caching
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
use crate::google::FromFieldOpt;
|
||||
use std::convert::{TryFrom, TryInto};
|
||||
use std::num::{NonZeroU32, NonZeroU64, NonZeroUsize};
|
||||
|
||||
use data_types::database_rules::{
|
||||
LifecycleRules, DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT,
|
||||
LifecycleRules, MaxActiveCompactions, DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT,
|
||||
DEFAULT_LATE_ARRIVE_WINDOW_SECONDS, DEFAULT_MUB_ROW_THRESHOLD,
|
||||
DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS, DEFAULT_PERSIST_ROW_THRESHOLD,
|
||||
DEFAULT_WORKER_BACKOFF_MILLIS,
|
||||
|
@ -13,7 +14,6 @@ use crate::influxdata::iox::management::v1 as management;
|
|||
|
||||
impl From<LifecycleRules> for management::LifecycleRules {
|
||||
fn from(config: LifecycleRules) -> Self {
|
||||
#[allow(deprecated)]
|
||||
Self {
|
||||
buffer_size_soft: config
|
||||
.buffer_size_soft
|
||||
|
@ -27,7 +27,7 @@ impl From<LifecycleRules> for management::LifecycleRules {
|
|||
persist: config.persist,
|
||||
immutable: config.immutable,
|
||||
worker_backoff_millis: config.worker_backoff_millis.get(),
|
||||
max_active_compactions: config.max_active_compactions.get(),
|
||||
max_active_compactions_cfg: Some(config.max_active_compactions.into()),
|
||||
catalog_transactions_until_checkpoint: config
|
||||
.catalog_transactions_until_checkpoint
|
||||
.get(),
|
||||
|
@ -43,6 +43,17 @@ impl From<LifecycleRules> for management::LifecycleRules {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<MaxActiveCompactions> for management::lifecycle_rules::MaxActiveCompactionsCfg {
|
||||
fn from(max: MaxActiveCompactions) -> Self {
|
||||
match max {
|
||||
MaxActiveCompactions::MaxActiveCompactions(n) => Self::MaxActiveCompactions(n.get()),
|
||||
MaxActiveCompactions::MaxActiveCompactionsCpuFraction { fraction, .. } => {
|
||||
Self::MaxActiveCompactionsCpuFraction(fraction)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<management::LifecycleRules> for LifecycleRules {
|
||||
type Error = FieldViolation;
|
||||
|
||||
|
@ -55,8 +66,10 @@ impl TryFrom<management::LifecycleRules> for LifecycleRules {
|
|||
immutable: proto.immutable,
|
||||
worker_backoff_millis: NonZeroU64::new(proto.worker_backoff_millis)
|
||||
.unwrap_or_else(|| NonZeroU64::new(DEFAULT_WORKER_BACKOFF_MILLIS).unwrap()),
|
||||
max_active_compactions: NonZeroU32::new(proto.max_active_compactions)
|
||||
.unwrap_or_else(|| NonZeroU32::new(num_cpus::get() as u32).unwrap()), // default to num CPU threads
|
||||
max_active_compactions: proto
|
||||
.max_active_compactions_cfg
|
||||
.optional("max_active_compactions")?
|
||||
.unwrap_or_default(),
|
||||
catalog_transactions_until_checkpoint: NonZeroU64::new(
|
||||
proto.catalog_transactions_until_checkpoint,
|
||||
)
|
||||
|
@ -78,13 +91,31 @@ impl TryFrom<management::LifecycleRules> for LifecycleRules {
|
|||
}
|
||||
}
|
||||
|
||||
impl TryFrom<management::lifecycle_rules::MaxActiveCompactionsCfg> for MaxActiveCompactions {
|
||||
type Error = FieldViolation;
|
||||
|
||||
fn try_from(
|
||||
value: management::lifecycle_rules::MaxActiveCompactionsCfg,
|
||||
) -> Result<Self, Self::Error> {
|
||||
use management::lifecycle_rules::MaxActiveCompactionsCfg::*;
|
||||
Ok(match value {
|
||||
MaxActiveCompactions(n) => {
|
||||
Self::MaxActiveCompactions(NonZeroU32::new(n).ok_or_else(|| FieldViolation {
|
||||
field: "max_active_compactions".to_string(),
|
||||
description: "must be non-zero".to_string(),
|
||||
})?)
|
||||
}
|
||||
MaxActiveCompactionsCpuFraction(fraction) => Self::new(fraction),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn lifecycle_rules() {
|
||||
#[allow(deprecated)]
|
||||
let protobuf = management::LifecycleRules {
|
||||
buffer_size_soft: 353,
|
||||
buffer_size_hard: 232,
|
||||
|
@ -92,7 +123,9 @@ mod tests {
|
|||
persist: true,
|
||||
immutable: true,
|
||||
worker_backoff_millis: 1000,
|
||||
max_active_compactions: 8,
|
||||
max_active_compactions_cfg: Some(
|
||||
management::lifecycle_rules::MaxActiveCompactionsCfg::MaxActiveCompactions(8),
|
||||
),
|
||||
catalog_transactions_until_checkpoint: 10,
|
||||
late_arrive_window_seconds: 23,
|
||||
persist_row_threshold: 57,
|
||||
|
@ -120,7 +153,10 @@ mod tests {
|
|||
assert_eq!(back.drop_non_persisted, protobuf.drop_non_persisted);
|
||||
assert_eq!(back.immutable, protobuf.immutable);
|
||||
assert_eq!(back.worker_backoff_millis, protobuf.worker_backoff_millis);
|
||||
assert_eq!(back.max_active_compactions, protobuf.max_active_compactions);
|
||||
assert_eq!(
|
||||
back.max_active_compactions_cfg,
|
||||
protobuf.max_active_compactions_cfg
|
||||
);
|
||||
assert_eq!(
|
||||
back.late_arrive_window_seconds,
|
||||
protobuf.late_arrive_window_seconds
|
||||
|
@ -143,5 +179,7 @@ mod tests {
|
|||
let protobuf = management::LifecycleRules::default();
|
||||
let config: LifecycleRules = protobuf.try_into().unwrap();
|
||||
assert_eq!(config, LifecycleRules::default());
|
||||
|
||||
assert_eq!(config.max_active_compactions.get(), num_cpus::get() as u32);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -652,6 +652,7 @@ mod tests {
|
|||
LockablePartition, PersistHandle,
|
||||
};
|
||||
use data_types::chunk_metadata::{ChunkAddr, ChunkStorage};
|
||||
use data_types::database_rules::MaxActiveCompactions::MaxActiveCompactions;
|
||||
use std::{
|
||||
cmp::max,
|
||||
collections::BTreeMap,
|
||||
|
@ -1366,7 +1367,7 @@ mod tests {
|
|||
let rules = LifecycleRules {
|
||||
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
|
||||
persist_row_threshold: NonZeroUsize::new(1_000).unwrap(),
|
||||
max_active_compactions: NonZeroU32::new(10).unwrap(),
|
||||
max_active_compactions: MaxActiveCompactions(NonZeroU32::new(10).unwrap()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
@ -1450,7 +1451,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_compaction_limiter() {
|
||||
let rules = LifecycleRules {
|
||||
max_active_compactions: 2.try_into().unwrap(),
|
||||
max_active_compactions: MaxActiveCompactions(2.try_into().unwrap()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
@ -1500,7 +1501,7 @@ mod tests {
|
|||
persist_row_threshold: NonZeroUsize::new(1_000).unwrap(),
|
||||
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
|
||||
persist_age_threshold_seconds: NonZeroU32::new(10).unwrap(),
|
||||
max_active_compactions: NonZeroU32::new(10).unwrap(),
|
||||
max_active_compactions: MaxActiveCompactions(NonZeroU32::new(10).unwrap()),
|
||||
..Default::default()
|
||||
};
|
||||
let now = Instant::now();
|
||||
|
|
|
@ -190,7 +190,7 @@ pub async fn command(url: String, config: Config) -> Result<()> {
|
|||
persist: command.persist,
|
||||
immutable: command.immutable,
|
||||
worker_backoff_millis: Default::default(),
|
||||
max_active_compactions: Default::default(),
|
||||
max_active_compactions_cfg: Default::default(),
|
||||
catalog_transactions_until_checkpoint: command
|
||||
.catalog_transactions_until_checkpoint
|
||||
.get(),
|
||||
|
|
|
@ -222,7 +222,9 @@ async fn test_create_get_update_database() {
|
|||
catalog_transactions_until_checkpoint: 13,
|
||||
late_arrive_window_seconds: 423,
|
||||
worker_backoff_millis: 15,
|
||||
max_active_compactions: 8,
|
||||
max_active_compactions_cfg: Some(
|
||||
lifecycle_rules::MaxActiveCompactionsCfg::MaxActiveCompactions(8),
|
||||
),
|
||||
persist_row_threshold: 342,
|
||||
persist_age_threshold_seconds: 700,
|
||||
mub_row_threshold: 1343,
|
||||
|
|
Loading…
Reference in New Issue