diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs index 6b3a223f5f..b66ae79ea9 100644 --- a/data_types/src/database_rules.rs +++ b/data_types/src/database_rules.rs @@ -112,7 +112,6 @@ impl Partitioner for DatabaseRules { } } -pub const DEFAULT_MUTABLE_LINGER_SECONDS: u32 = 300; pub const DEFAULT_WORKER_BACKOFF_MILLIS: u64 = 1_000; pub const DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT: u64 = 100; pub const DEFAULT_MUB_ROW_THRESHOLD: usize = 100_000; @@ -123,13 +122,6 @@ pub const DEFAULT_LATE_ARRIVE_WINDOW_SECONDS: u32 = 5 * 60; /// Configures how data automatically flows through the system #[derive(Debug, Eq, PartialEq, Clone)] pub struct LifecycleRules { - /// A chunk of data within a partition that has been cold for writes for - /// this many seconds will be frozen and compacted (moved to the read - /// buffer) if the chunk is older than mutable_min_lifetime_seconds - /// - /// Represents the chunk transition open -> moving and closed -> moving - pub mutable_linger_seconds: NonZeroU32, - /// Once the total amount of buffered data in memory reaches this size start /// dropping data from memory pub buffer_size_soft: Option, @@ -155,7 +147,9 @@ pub struct LifecycleRules { /// After how many transactions should IOx write a new checkpoint? pub catalog_transactions_until_checkpoint: NonZeroU64, - /// The average timestamp skew across concurrent writers + /// Writers will generally have this amount of time to send late arriving writes + /// or this could be their clock skew. Once a partition hasn't recieved a write + /// for this period of time, it will be compacted and, if set, persisted. pub late_arrive_window_seconds: NonZeroU32, /// Maximum number of rows before triggering persistence @@ -175,7 +169,6 @@ impl LifecycleRules { impl Default for LifecycleRules { fn default() -> Self { Self { - mutable_linger_seconds: NonZeroU32::new(DEFAULT_MUTABLE_LINGER_SECONDS).unwrap(), buffer_size_soft: None, buffer_size_hard: None, drop_non_persisted: false, diff --git a/generated_types/protos/influxdata/iox/management/v1/database_rules.proto b/generated_types/protos/influxdata/iox/management/v1/database_rules.proto index 0f9ad722ef..03864003ca 100644 --- a/generated_types/protos/influxdata/iox/management/v1/database_rules.proto +++ b/generated_types/protos/influxdata/iox/management/v1/database_rules.proto @@ -32,13 +32,6 @@ message PartitionTemplate { } message LifecycleRules { - // A chunk of data within a partition that has been cold for writes for this - // many seconds will be frozen and compacted (moved to the read buffer) - // if the chunk is older than mutable_min_lifetime_seconds - // - // Represents the chunk transition open -> moving and closed -> moving - uint32 mutable_linger_seconds = 1; - // Once the total amount of buffered data in memory reaches this size start // dropping data from memory uint64 buffer_size_soft = 4; @@ -69,7 +62,9 @@ message LifecycleRules { // If 0 / absent, this default to 100. uint64 catalog_transactions_until_checkpoint = 11; - // The average timestamp skew across concurrent writers + /// Writers will generally have this amount of time to send late arriving writes + /// or this could be their clock skew. Once a partition hasn't recieved a write + /// for this period of time, it will be compacted and, if set, persisted. uint32 late_arrive_window_seconds = 12; // Maximum number of rows before triggering persistence diff --git a/generated_types/src/database_rules/lifecycle.rs b/generated_types/src/database_rules/lifecycle.rs index da1ffb87b7..816df882ba 100644 --- a/generated_types/src/database_rules/lifecycle.rs +++ b/generated_types/src/database_rules/lifecycle.rs @@ -3,9 +3,8 @@ use std::num::{NonZeroU32, NonZeroU64, NonZeroUsize}; use data_types::database_rules::{ LifecycleRules, DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT, - DEFAULT_LATE_ARRIVE_WINDOW_SECONDS, DEFAULT_MUTABLE_LINGER_SECONDS, - DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS, DEFAULT_PERSIST_ROW_THRESHOLD, - DEFAULT_WORKER_BACKOFF_MILLIS, + DEFAULT_LATE_ARRIVE_WINDOW_SECONDS, DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS, + DEFAULT_PERSIST_ROW_THRESHOLD, DEFAULT_WORKER_BACKOFF_MILLIS, }; use crate::google::FieldViolation; @@ -14,7 +13,6 @@ use crate::influxdata::iox::management::v1 as management; impl From for management::LifecycleRules { fn from(config: LifecycleRules) -> Self { Self { - mutable_linger_seconds: config.mutable_linger_seconds.get(), buffer_size_soft: config .buffer_size_soft .map(|x| x.get() as u64) @@ -42,12 +40,6 @@ impl TryFrom for LifecycleRules { fn try_from(proto: management::LifecycleRules) -> Result { Ok(Self { - mutable_linger_seconds: NonZeroU32::new(if proto.mutable_linger_seconds == 0 { - DEFAULT_MUTABLE_LINGER_SECONDS - } else { - proto.mutable_linger_seconds - }) - .unwrap(), buffer_size_soft: (proto.buffer_size_soft as usize).try_into().ok(), buffer_size_hard: (proto.buffer_size_hard as usize).try_into().ok(), drop_non_persisted: proto.drop_non_persisted, @@ -80,7 +72,6 @@ mod tests { #[test] fn lifecycle_rules() { let protobuf = management::LifecycleRules { - mutable_linger_seconds: 123, buffer_size_soft: 353, buffer_size_hard: 232, drop_non_persisted: true, @@ -96,10 +87,6 @@ mod tests { let config: LifecycleRules = protobuf.clone().try_into().unwrap(); let back: management::LifecycleRules = config.clone().into(); - assert_eq!( - config.mutable_linger_seconds.get(), - protobuf.mutable_linger_seconds - ); assert_eq!( config.buffer_size_soft.unwrap().get(), protobuf.buffer_size_soft as usize @@ -111,7 +98,6 @@ mod tests { assert_eq!(config.drop_non_persisted, protobuf.drop_non_persisted); assert_eq!(config.immutable, protobuf.immutable); - assert_eq!(back.mutable_linger_seconds, protobuf.mutable_linger_seconds); assert_eq!(back.buffer_size_soft, protobuf.buffer_size_soft); assert_eq!(back.buffer_size_hard, protobuf.buffer_size_hard); assert_eq!(back.drop_non_persisted, protobuf.drop_non_persisted); diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index 02eab3f549..87d8999eae 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -552,7 +552,7 @@ fn can_move(rules: &LifecycleRules, chunk: &C, now: DateTime< match chunk.time_of_last_write() { Some(last_write) - if elapsed_seconds(now, last_write) >= rules.mutable_linger_seconds.get() => + if elapsed_seconds(now, last_write) >= rules.late_arrive_window_seconds.get() => { true } @@ -1031,9 +1031,9 @@ mod tests { #[test] fn test_can_move() { - // If only mutable_linger set can move a chunk once passed + // If only late_arrival set can move a chunk once passed let rules = LifecycleRules { - mutable_linger_seconds: NonZeroU32::new(10).unwrap(), + late_arrive_window_seconds: NonZeroU32::new(10).unwrap(), ..Default::default() }; let chunk = TestChunk::new(0, Some(0), Some(0), ChunkStorage::OpenMutableBuffer); @@ -1130,9 +1130,9 @@ mod tests { } #[test] - fn test_mutable_linger() { + fn test_late_arrival() { let rules = LifecycleRules { - mutable_linger_seconds: NonZeroU32::new(10).unwrap(), + late_arrive_window_seconds: NonZeroU32::new(10).unwrap(), ..Default::default() }; let chunks = vec![ @@ -1176,7 +1176,7 @@ mod tests { async fn test_backoff() { let mut registry = TaskRegistry::new(); let rules = LifecycleRules { - mutable_linger_seconds: NonZeroU32::new(100).unwrap(), + late_arrive_window_seconds: NonZeroU32::new(100).unwrap(), ..Default::default() }; let db = TestDb::new(rules, vec![]); @@ -1320,7 +1320,7 @@ mod tests { #[test] fn test_compact() { let rules = LifecycleRules { - mutable_linger_seconds: NonZeroU32::new(10).unwrap(), + late_arrive_window_seconds: NonZeroU32::new(10).unwrap(), persist_row_threshold: NonZeroUsize::new(1_000).unwrap(), ..Default::default() }; @@ -1411,7 +1411,6 @@ mod tests { #[test] fn test_persist() { let rules = LifecycleRules { - mutable_linger_seconds: NonZeroU32::new(10).unwrap(), persist: true, persist_row_threshold: NonZeroUsize::new(1_000).unwrap(), late_arrive_window_seconds: NonZeroU32::new(10).unwrap(), @@ -1439,7 +1438,7 @@ mod tests { .with_persistence(1_000, now, from_secs(20)), // Writes too old => persist TestPartition::new(vec![ - // Should split open chunks irrespective of mutable_linger_seconds + // Should split open chunks TestChunk::new(4, Some(0), Some(20), ChunkStorage::OpenMutableBuffer) .with_min_timestamp(from_secs(10)), TestChunk::new(5, Some(0), Some(0), ChunkStorage::ReadBuffer) @@ -1508,7 +1507,7 @@ mod tests { #[test] fn test_moves_open() { let rules = LifecycleRules { - mutable_linger_seconds: NonZeroU32::new(10).unwrap(), + late_arrive_window_seconds: NonZeroU32::new(10).unwrap(), ..Default::default() }; let chunks = vec![TestChunk::new( @@ -1528,7 +1527,7 @@ mod tests { #[test] fn test_moves_closed() { let rules = LifecycleRules { - mutable_linger_seconds: NonZeroU32::new(10).unwrap(), + late_arrive_window_seconds: NonZeroU32::new(10).unwrap(), ..Default::default() }; let chunks = vec![TestChunk::new( diff --git a/src/commands/database.rs b/src/commands/database.rs index 01535ba479..5fa2892ed1 100644 --- a/src/commands/database.rs +++ b/src/commands/database.rs @@ -74,15 +74,6 @@ pub struct Config { struct Create { /// The name of the database name: String, - - /// A chunk of data within a partition that has been cold for writes for - /// this many seconds will be frozen and compacted (moved to the read - /// buffer) if the chunk is older than mutable_min_lifetime_seconds - /// - /// Represents the chunk transition open -> moving and closed -> moving - #[structopt(long, default_value = "300")] // 5 minutes - mutable_linger_seconds: u32, - /// Once the total amount of buffered data in memory reaches this size start /// dropping data from memory based on the drop_order #[structopt(long, default_value = "52428800")] // 52428800 = 50*1024*1024 @@ -110,7 +101,9 @@ struct Create { #[structopt(long, default_value = "100", parse(try_from_str))] catalog_transactions_until_checkpoint: NonZeroU64, - /// The average timestamp skew across concurrent writers + /// Writers will generally have this amount of time to send late arriving writes + /// or this could be their clock skew. Once a partition hasn't recieved a write + /// for this period of time, it will be compacted and, if set, persisted. #[structopt(long, default_value = "300")] late_arrive_window_seconds: u32, @@ -180,7 +173,6 @@ pub async fn command(url: String, config: Config) -> Result<()> { let rules = DatabaseRules { name: command.name, lifecycle_rules: Some(LifecycleRules { - mutable_linger_seconds: command.mutable_linger_seconds, buffer_size_soft: command.buffer_size_soft as _, buffer_size_hard: command.buffer_size_hard as _, drop_non_persisted: command.drop_non_persisted, diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 0943afca1c..f0fa002222 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -14,7 +14,6 @@ use super::scenario::{ create_readable_database, create_two_partition_database, create_unreadable_database, rand_name, }; use crate::common::server_fixture::ServerFixture; -use data_types::database_rules::DEFAULT_MUTABLE_LINGER_SECONDS; use std::time::Instant; use tonic::Code; @@ -244,11 +243,6 @@ async fn test_create_get_update_database() { ignore_errors: true, ..Default::default() })); - rules.lifecycle_rules = Some(LifecycleRules { - mutable_linger_seconds: DEFAULT_MUTABLE_LINGER_SECONDS, - ..rules.lifecycle_rules.unwrap() - }); - let updated_rules = client .update_database(rules.clone()) .await @@ -731,7 +725,7 @@ async fn test_chunk_lifecycle() { .create_database(DatabaseRules { name: db_name.clone(), lifecycle_rules: Some(LifecycleRules { - mutable_linger_seconds: 1, + late_arrive_window_seconds: 1, ..Default::default() }), ..Default::default() diff --git a/tests/end_to_end_cases/scenario.rs b/tests/end_to_end_cases/scenario.rs index bd3fec1cef..28788e5f81 100644 --- a/tests/end_to_end_cases/scenario.rs +++ b/tests/end_to_end_cases/scenario.rs @@ -347,7 +347,6 @@ pub async fn create_quickly_persisting_database( }], }), lifecycle_rules: Some(LifecycleRules { - mutable_linger_seconds: 1, buffer_size_soft: 512 * 1024, // 512K buffer_size_hard: 10 * 1024 * 1024, // 10MB persist: true,