Merge pull request #1915 from influxdata/fixnonelinger
fix: Ensure mutable_linger_seconds can not be Nonepull/24376/head
commit
7b18c29d32
|
@ -106,6 +106,7 @@ impl Partitioner for DatabaseRules {
|
|||
}
|
||||
}
|
||||
|
||||
pub const DEFAULT_MUTABLE_LINGER_SECONDS: u32 = 300;
|
||||
pub const DEFAULT_WORKER_BACKOFF_MILLIS: u64 = 1_000;
|
||||
pub const DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT: u64 = 100;
|
||||
pub const DEFAULT_MUB_ROW_THRESHOLD: usize = 100_000;
|
||||
|
@ -121,7 +122,7 @@ pub struct LifecycleRules {
|
|||
/// buffer) if the chunk is older than mutable_min_lifetime_seconds
|
||||
///
|
||||
/// Represents the chunk transition open -> moving and closed -> moving
|
||||
pub mutable_linger_seconds: Option<NonZeroU32>,
|
||||
pub mutable_linger_seconds: NonZeroU32,
|
||||
|
||||
/// A chunk of data within a partition is guaranteed to remain mutable
|
||||
/// for at least this number of seconds unless it exceeds the mutable_size_threshold
|
||||
|
@ -172,7 +173,7 @@ impl LifecycleRules {
|
|||
impl Default for LifecycleRules {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
mutable_linger_seconds: None,
|
||||
mutable_linger_seconds: NonZeroU32::new(DEFAULT_MUTABLE_LINGER_SECONDS).unwrap(),
|
||||
mutable_minimum_age_seconds: None,
|
||||
buffer_size_soft: None,
|
||||
buffer_size_hard: None,
|
||||
|
|
|
@ -3,8 +3,9 @@ use std::num::{NonZeroU32, NonZeroU64, NonZeroUsize};
|
|||
|
||||
use data_types::database_rules::{
|
||||
LifecycleRules, DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT,
|
||||
DEFAULT_LATE_ARRIVE_WINDOW_SECONDS, DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS,
|
||||
DEFAULT_PERSIST_ROW_THRESHOLD, DEFAULT_WORKER_BACKOFF_MILLIS,
|
||||
DEFAULT_LATE_ARRIVE_WINDOW_SECONDS, DEFAULT_MUTABLE_LINGER_SECONDS,
|
||||
DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS, DEFAULT_PERSIST_ROW_THRESHOLD,
|
||||
DEFAULT_WORKER_BACKOFF_MILLIS,
|
||||
};
|
||||
|
||||
use crate::google::FieldViolation;
|
||||
|
@ -13,10 +14,7 @@ use crate::influxdata::iox::management::v1 as management;
|
|||
impl From<LifecycleRules> for management::LifecycleRules {
|
||||
fn from(config: LifecycleRules) -> Self {
|
||||
Self {
|
||||
mutable_linger_seconds: config
|
||||
.mutable_linger_seconds
|
||||
.map(Into::into)
|
||||
.unwrap_or_default(),
|
||||
mutable_linger_seconds: config.mutable_linger_seconds.get(),
|
||||
mutable_minimum_age_seconds: config
|
||||
.mutable_minimum_age_seconds
|
||||
.map(Into::into)
|
||||
|
@ -48,7 +46,12 @@ impl TryFrom<management::LifecycleRules> for LifecycleRules {
|
|||
|
||||
fn try_from(proto: management::LifecycleRules) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
mutable_linger_seconds: proto.mutable_linger_seconds.try_into().ok(),
|
||||
mutable_linger_seconds: NonZeroU32::new(if proto.mutable_linger_seconds == 0 {
|
||||
DEFAULT_MUTABLE_LINGER_SECONDS
|
||||
} else {
|
||||
proto.mutable_linger_seconds
|
||||
})
|
||||
.unwrap(),
|
||||
mutable_minimum_age_seconds: proto.mutable_minimum_age_seconds.try_into().ok(),
|
||||
buffer_size_soft: (proto.buffer_size_soft as usize).try_into().ok(),
|
||||
buffer_size_hard: (proto.buffer_size_hard as usize).try_into().ok(),
|
||||
|
@ -100,7 +103,7 @@ mod tests {
|
|||
let back: management::LifecycleRules = config.clone().into();
|
||||
|
||||
assert_eq!(
|
||||
config.mutable_linger_seconds.unwrap().get(),
|
||||
config.mutable_linger_seconds.get(),
|
||||
protobuf.mutable_linger_seconds
|
||||
);
|
||||
assert_eq!(
|
||||
|
|
|
@ -483,8 +483,10 @@ fn can_move<C: LifecycleChunk>(rules: &LifecycleRules, chunk: &C, now: DateTime<
|
|||
return true;
|
||||
}
|
||||
|
||||
match (rules.mutable_linger_seconds, chunk.time_of_last_write()) {
|
||||
(Some(linger), Some(last_write)) if elapsed_seconds(now, last_write) >= linger.get() => {
|
||||
match chunk.time_of_last_write() {
|
||||
Some(last_write)
|
||||
if elapsed_seconds(now, last_write) >= rules.mutable_linger_seconds.get() =>
|
||||
{
|
||||
match (
|
||||
rules.mutable_minimum_age_seconds,
|
||||
chunk.time_of_first_write(),
|
||||
|
@ -499,8 +501,7 @@ fn can_move<C: LifecycleChunk>(rules: &LifecycleRules, chunk: &C, now: DateTime<
|
|||
}
|
||||
}
|
||||
|
||||
// Disable movement if no mutable_linger set,
|
||||
// or the chunk is empty, or the linger hasn't expired
|
||||
// Disable movement the chunk is empty, or the linger hasn't expired
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
@ -889,14 +890,9 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_can_move() {
|
||||
// Cannot move by default
|
||||
let rules = LifecycleRules::default();
|
||||
let chunk = TestChunk::new(0, Some(0), Some(0), ChunkStorage::OpenMutableBuffer);
|
||||
assert!(!can_move(&rules, &chunk, from_secs(20)));
|
||||
|
||||
// If only mutable_linger set can move a chunk once passed
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: Some(NonZeroU32::new(10).unwrap()),
|
||||
mutable_linger_seconds: NonZeroU32::new(10).unwrap(),
|
||||
..Default::default()
|
||||
};
|
||||
let chunk = TestChunk::new(0, Some(0), Some(0), ChunkStorage::OpenMutableBuffer);
|
||||
|
@ -910,7 +906,7 @@ mod tests {
|
|||
|
||||
// If mutable_minimum_age_seconds set must also take this into account
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: Some(NonZeroU32::new(10).unwrap()),
|
||||
mutable_linger_seconds: NonZeroU32::new(10).unwrap(),
|
||||
mutable_minimum_age_seconds: Some(NonZeroU32::new(60).unwrap()),
|
||||
..Default::default()
|
||||
};
|
||||
|
@ -1022,7 +1018,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_mutable_linger() {
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: Some(NonZeroU32::new(10).unwrap()),
|
||||
mutable_linger_seconds: NonZeroU32::new(10).unwrap(),
|
||||
..Default::default()
|
||||
};
|
||||
let chunks = vec![
|
||||
|
@ -1066,7 +1062,7 @@ mod tests {
|
|||
async fn test_backoff() {
|
||||
let mut registry = TaskRegistry::new();
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: Some(NonZeroU32::new(100).unwrap()),
|
||||
mutable_linger_seconds: NonZeroU32::new(100).unwrap(),
|
||||
..Default::default()
|
||||
};
|
||||
let db = TestDb::new(rules, vec![]);
|
||||
|
@ -1093,7 +1089,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_minimum_age() {
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: Some(NonZeroU32::new(10).unwrap()),
|
||||
mutable_linger_seconds: NonZeroU32::new(10).unwrap(),
|
||||
mutable_minimum_age_seconds: Some(NonZeroU32::new(60).unwrap()),
|
||||
..Default::default()
|
||||
};
|
||||
|
@ -1251,7 +1247,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_compact() {
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: Some(NonZeroU32::new(10).unwrap()),
|
||||
mutable_linger_seconds: NonZeroU32::new(10).unwrap(),
|
||||
persist_row_threshold: NonZeroUsize::new(1_000).unwrap(),
|
||||
..Default::default()
|
||||
};
|
||||
|
@ -1342,7 +1338,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_persist() {
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: Some(NonZeroU32::new(10).unwrap()),
|
||||
mutable_linger_seconds: NonZeroU32::new(10).unwrap(),
|
||||
persist: true,
|
||||
persist_row_threshold: NonZeroUsize::new(1_000).unwrap(),
|
||||
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
|
||||
|
@ -1406,7 +1402,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_moves_closed() {
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: Some(NonZeroU32::new(10).unwrap()),
|
||||
mutable_linger_seconds: NonZeroU32::new(10).unwrap(),
|
||||
mutable_minimum_age_seconds: Some(NonZeroU32::new(60).unwrap()),
|
||||
..Default::default()
|
||||
};
|
||||
|
|
|
@ -14,6 +14,7 @@ use super::scenario::{
|
|||
create_readable_database, create_two_partition_database, create_unreadable_database, rand_name,
|
||||
};
|
||||
use crate::common::server_fixture::ServerFixture;
|
||||
use data_types::database_rules::DEFAULT_MUTABLE_LINGER_SECONDS;
|
||||
use std::time::Instant;
|
||||
use tonic::Code;
|
||||
|
||||
|
@ -243,6 +244,10 @@ async fn test_create_get_update_database() {
|
|||
ignore_errors: true,
|
||||
..Default::default()
|
||||
}));
|
||||
rules.lifecycle_rules = Some(LifecycleRules {
|
||||
mutable_linger_seconds: DEFAULT_MUTABLE_LINGER_SECONDS,
|
||||
..rules.lifecycle_rules.unwrap()
|
||||
});
|
||||
|
||||
let updated_rules = client
|
||||
.update_database(rules.clone())
|
||||
|
|
Loading…
Reference in New Issue