feat: add parquet cache size setting to database rules
parent
61da0fe4df
commit
297e059085
|
@ -166,6 +166,9 @@ pub struct LifecycleRules {
|
|||
|
||||
/// Maximum number of rows to buffer in a MUB chunk before compacting it
|
||||
pub mub_row_threshold: NonZeroUsize,
|
||||
|
||||
/// Use up to this amount of space in bytes for caching Parquet files
|
||||
pub parquet_cache_limit: Option<NonZeroUsize>,
|
||||
}
|
||||
|
||||
impl LifecycleRules {
|
||||
|
@ -195,6 +198,7 @@ impl Default for LifecycleRules {
|
|||
persist_age_threshold_seconds: NonZeroU32::new(DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS)
|
||||
.unwrap(),
|
||||
mub_row_threshold: NonZeroUsize::new(DEFAULT_MUB_ROW_THRESHOLD).unwrap(),
|
||||
parquet_cache_limit: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,6 +82,13 @@ message LifecycleRules {
|
|||
// If 0, compactions are limited to the default number.
|
||||
// See data_types::database_rules::DEFAULT_MAX_ACTIVE_COMPACTIONS
|
||||
uint32 max_active_compactions = 16;
|
||||
|
||||
// Use up to this amount of space in bytes for caching Parquet files
|
||||
ParquetCacheLimit parquet_cache_limit = 17;
|
||||
}
|
||||
|
||||
message ParquetCacheLimit {
|
||||
uint64 value = 1;
|
||||
}
|
||||
|
||||
message DatabaseRules {
|
||||
|
|
|
@ -10,6 +10,7 @@ use data_types::database_rules::{
|
|||
|
||||
use crate::google::FieldViolation;
|
||||
use crate::influxdata::iox::management::v1 as management;
|
||||
use crate::influxdata::iox::management::v1::ParquetCacheLimit;
|
||||
|
||||
impl From<LifecycleRules> for management::LifecycleRules {
|
||||
fn from(config: LifecycleRules) -> Self {
|
||||
|
@ -35,6 +36,9 @@ impl From<LifecycleRules> for management::LifecycleRules {
|
|||
persist_row_threshold: config.persist_row_threshold.get() as u64,
|
||||
persist_age_threshold_seconds: config.persist_age_threshold_seconds.get(),
|
||||
mub_row_threshold: config.mub_row_threshold.get() as u64,
|
||||
parquet_cache_limit: config.parquet_cache_limit.map(|x| ParquetCacheLimit {
|
||||
value: x.get() as u64,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -43,6 +47,11 @@ impl TryFrom<management::LifecycleRules> for LifecycleRules {
|
|||
type Error = FieldViolation;
|
||||
|
||||
fn try_from(proto: management::LifecycleRules) -> Result<Self, Self::Error> {
|
||||
let parquet_cache_limit = match proto.parquet_cache_limit {
|
||||
Some(l) => (l.value as usize).try_into().ok(),
|
||||
None => None,
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
buffer_size_soft: (proto.buffer_size_soft as usize).try_into().ok(),
|
||||
buffer_size_hard: (proto.buffer_size_hard as usize).try_into().ok(),
|
||||
|
@ -69,6 +78,7 @@ impl TryFrom<management::LifecycleRules> for LifecycleRules {
|
|||
.unwrap_or_else(|| NonZeroU32::new(DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS).unwrap()),
|
||||
mub_row_threshold: NonZeroUsize::new(proto.mub_row_threshold as usize)
|
||||
.unwrap_or_else(|| NonZeroUsize::new(DEFAULT_MUB_ROW_THRESHOLD).unwrap()),
|
||||
parquet_cache_limit,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -93,6 +103,7 @@ mod tests {
|
|||
persist_row_threshold: 57,
|
||||
persist_age_threshold_seconds: 23,
|
||||
mub_row_threshold: 3454,
|
||||
parquet_cache_limit: Some(ParquetCacheLimit { value: 10 }),
|
||||
};
|
||||
|
||||
let config: LifecycleRules = protobuf.clone().try_into().unwrap();
|
||||
|
@ -125,6 +136,11 @@ mod tests {
|
|||
protobuf.persist_age_threshold_seconds
|
||||
);
|
||||
assert_eq!(back.mub_row_threshold, protobuf.mub_row_threshold);
|
||||
assert_eq!(
|
||||
config.parquet_cache_limit.unwrap().get(),
|
||||
protobuf.parquet_cache_limit.as_ref().unwrap().value as usize
|
||||
);
|
||||
assert_eq!(back.parquet_cache_limit, protobuf.parquet_cache_limit);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -13,6 +13,7 @@ use influxdb_iox_client::{
|
|||
},
|
||||
write::{self, WriteError},
|
||||
};
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
mod catalog;
|
||||
mod chunk;
|
||||
|
@ -119,6 +120,10 @@ struct Create {
|
|||
/// Maximum number of rows to buffer in a MUB chunk before compacting it
|
||||
#[structopt(long, default_value = "100000")]
|
||||
mub_row_threshold: u64,
|
||||
|
||||
/// Use up to this amount of space in bytes for caching Parquet files
|
||||
#[structopt(long, parse(try_from_str))]
|
||||
pub parquet_cache_limit: Option<NonZeroUsize>,
|
||||
}
|
||||
|
||||
/// Get list of databases
|
||||
|
@ -193,6 +198,7 @@ pub async fn command(url: String, config: Config) -> Result<()> {
|
|||
persist_row_threshold: command.persist_row_threshold,
|
||||
persist_age_threshold_seconds: command.persist_age_threshold_seconds,
|
||||
mub_row_threshold: command.mub_row_threshold,
|
||||
parquet_cache_limit: command.parquet_cache_limit.map(|l| ParquetCacheLimit{value: l.get() as u64}),
|
||||
}),
|
||||
|
||||
// Default to hourly partitions
|
||||
|
|
Loading…
Reference in New Issue