refactor: remove mutable_linger_seconds from lifecycle
The interplay between mutable_linger_seconds, late_arrive_window and persist_age_threshold_seconds can be tricky to reason about. I realized that the lifecycle rules can be simplified by removing mutable_linger_seconds and instead using late_arrive_window_seconds for the same purpose. Semantically, they basically mean the same thing. We want to give data around this amount of time to arrive before the system persists it, which gives it more of an opportunity to persist non-overlapping data. When a partition goes cold for writes, after we've waiting past this window, we should compact and persist that partition. This removes one unnecessary knob from the lifecycle configuration and also removes the potential for conflicting configuration options.pull/24376/head
parent
9534220035
commit
0c8c81a321
|
@ -112,7 +112,6 @@ impl Partitioner for DatabaseRules {
|
|||
}
|
||||
}
|
||||
|
||||
pub const DEFAULT_MUTABLE_LINGER_SECONDS: u32 = 300;
|
||||
pub const DEFAULT_WORKER_BACKOFF_MILLIS: u64 = 1_000;
|
||||
pub const DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT: u64 = 100;
|
||||
pub const DEFAULT_MUB_ROW_THRESHOLD: usize = 100_000;
|
||||
|
@ -123,13 +122,6 @@ pub const DEFAULT_LATE_ARRIVE_WINDOW_SECONDS: u32 = 5 * 60;
|
|||
/// Configures how data automatically flows through the system
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub struct LifecycleRules {
|
||||
/// A chunk of data within a partition that has been cold for writes for
|
||||
/// this many seconds will be frozen and compacted (moved to the read
|
||||
/// buffer) if the chunk is older than mutable_min_lifetime_seconds
|
||||
///
|
||||
/// Represents the chunk transition open -> moving and closed -> moving
|
||||
pub mutable_linger_seconds: NonZeroU32,
|
||||
|
||||
/// Once the total amount of buffered data in memory reaches this size start
|
||||
/// dropping data from memory
|
||||
pub buffer_size_soft: Option<NonZeroUsize>,
|
||||
|
@ -155,7 +147,9 @@ pub struct LifecycleRules {
|
|||
/// After how many transactions should IOx write a new checkpoint?
|
||||
pub catalog_transactions_until_checkpoint: NonZeroU64,
|
||||
|
||||
/// The average timestamp skew across concurrent writers
|
||||
/// Writers will generally have this amount of time to send late arriving writes
|
||||
/// or this could be their clock skew. Once a partition hasn't recieved a write
|
||||
/// for this period of time, it will be compacted and, if set, persisted.
|
||||
pub late_arrive_window_seconds: NonZeroU32,
|
||||
|
||||
/// Maximum number of rows before triggering persistence
|
||||
|
@ -175,7 +169,6 @@ impl LifecycleRules {
|
|||
impl Default for LifecycleRules {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
mutable_linger_seconds: NonZeroU32::new(DEFAULT_MUTABLE_LINGER_SECONDS).unwrap(),
|
||||
buffer_size_soft: None,
|
||||
buffer_size_hard: None,
|
||||
drop_non_persisted: false,
|
||||
|
|
|
@ -32,13 +32,6 @@ message PartitionTemplate {
|
|||
}
|
||||
|
||||
message LifecycleRules {
|
||||
// A chunk of data within a partition that has been cold for writes for this
|
||||
// many seconds will be frozen and compacted (moved to the read buffer)
|
||||
// if the chunk is older than mutable_min_lifetime_seconds
|
||||
//
|
||||
// Represents the chunk transition open -> moving and closed -> moving
|
||||
uint32 mutable_linger_seconds = 1;
|
||||
|
||||
// Once the total amount of buffered data in memory reaches this size start
|
||||
// dropping data from memory
|
||||
uint64 buffer_size_soft = 4;
|
||||
|
@ -69,7 +62,9 @@ message LifecycleRules {
|
|||
// If 0 / absent, this default to 100.
|
||||
uint64 catalog_transactions_until_checkpoint = 11;
|
||||
|
||||
// The average timestamp skew across concurrent writers
|
||||
/// Writers will generally have this amount of time to send late arriving writes
|
||||
/// or this could be their clock skew. Once a partition hasn't recieved a write
|
||||
/// for this period of time, it will be compacted and, if set, persisted.
|
||||
uint32 late_arrive_window_seconds = 12;
|
||||
|
||||
// Maximum number of rows before triggering persistence
|
||||
|
|
|
@ -3,9 +3,8 @@ use std::num::{NonZeroU32, NonZeroU64, NonZeroUsize};
|
|||
|
||||
use data_types::database_rules::{
|
||||
LifecycleRules, DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT,
|
||||
DEFAULT_LATE_ARRIVE_WINDOW_SECONDS, DEFAULT_MUTABLE_LINGER_SECONDS,
|
||||
DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS, DEFAULT_PERSIST_ROW_THRESHOLD,
|
||||
DEFAULT_WORKER_BACKOFF_MILLIS,
|
||||
DEFAULT_LATE_ARRIVE_WINDOW_SECONDS, DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS,
|
||||
DEFAULT_PERSIST_ROW_THRESHOLD, DEFAULT_WORKER_BACKOFF_MILLIS,
|
||||
};
|
||||
|
||||
use crate::google::FieldViolation;
|
||||
|
@ -14,7 +13,6 @@ use crate::influxdata::iox::management::v1 as management;
|
|||
impl From<LifecycleRules> for management::LifecycleRules {
|
||||
fn from(config: LifecycleRules) -> Self {
|
||||
Self {
|
||||
mutable_linger_seconds: config.mutable_linger_seconds.get(),
|
||||
buffer_size_soft: config
|
||||
.buffer_size_soft
|
||||
.map(|x| x.get() as u64)
|
||||
|
@ -42,12 +40,6 @@ impl TryFrom<management::LifecycleRules> for LifecycleRules {
|
|||
|
||||
fn try_from(proto: management::LifecycleRules) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
mutable_linger_seconds: NonZeroU32::new(if proto.mutable_linger_seconds == 0 {
|
||||
DEFAULT_MUTABLE_LINGER_SECONDS
|
||||
} else {
|
||||
proto.mutable_linger_seconds
|
||||
})
|
||||
.unwrap(),
|
||||
buffer_size_soft: (proto.buffer_size_soft as usize).try_into().ok(),
|
||||
buffer_size_hard: (proto.buffer_size_hard as usize).try_into().ok(),
|
||||
drop_non_persisted: proto.drop_non_persisted,
|
||||
|
@ -80,7 +72,6 @@ mod tests {
|
|||
#[test]
|
||||
fn lifecycle_rules() {
|
||||
let protobuf = management::LifecycleRules {
|
||||
mutable_linger_seconds: 123,
|
||||
buffer_size_soft: 353,
|
||||
buffer_size_hard: 232,
|
||||
drop_non_persisted: true,
|
||||
|
@ -96,10 +87,6 @@ mod tests {
|
|||
let config: LifecycleRules = protobuf.clone().try_into().unwrap();
|
||||
let back: management::LifecycleRules = config.clone().into();
|
||||
|
||||
assert_eq!(
|
||||
config.mutable_linger_seconds.get(),
|
||||
protobuf.mutable_linger_seconds
|
||||
);
|
||||
assert_eq!(
|
||||
config.buffer_size_soft.unwrap().get(),
|
||||
protobuf.buffer_size_soft as usize
|
||||
|
@ -111,7 +98,6 @@ mod tests {
|
|||
assert_eq!(config.drop_non_persisted, protobuf.drop_non_persisted);
|
||||
assert_eq!(config.immutable, protobuf.immutable);
|
||||
|
||||
assert_eq!(back.mutable_linger_seconds, protobuf.mutable_linger_seconds);
|
||||
assert_eq!(back.buffer_size_soft, protobuf.buffer_size_soft);
|
||||
assert_eq!(back.buffer_size_hard, protobuf.buffer_size_hard);
|
||||
assert_eq!(back.drop_non_persisted, protobuf.drop_non_persisted);
|
||||
|
|
|
@ -552,7 +552,7 @@ fn can_move<C: LifecycleChunk>(rules: &LifecycleRules, chunk: &C, now: DateTime<
|
|||
|
||||
match chunk.time_of_last_write() {
|
||||
Some(last_write)
|
||||
if elapsed_seconds(now, last_write) >= rules.mutable_linger_seconds.get() =>
|
||||
if elapsed_seconds(now, last_write) >= rules.late_arrive_window_seconds.get() =>
|
||||
{
|
||||
true
|
||||
}
|
||||
|
@ -1031,9 +1031,9 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_can_move() {
|
||||
// If only mutable_linger set can move a chunk once passed
|
||||
// If only late_arrival set can move a chunk once passed
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: NonZeroU32::new(10).unwrap(),
|
||||
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
|
||||
..Default::default()
|
||||
};
|
||||
let chunk = TestChunk::new(0, Some(0), Some(0), ChunkStorage::OpenMutableBuffer);
|
||||
|
@ -1130,9 +1130,9 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn test_mutable_linger() {
|
||||
fn test_late_arrival() {
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: NonZeroU32::new(10).unwrap(),
|
||||
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
|
||||
..Default::default()
|
||||
};
|
||||
let chunks = vec![
|
||||
|
@ -1176,7 +1176,7 @@ mod tests {
|
|||
async fn test_backoff() {
|
||||
let mut registry = TaskRegistry::new();
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: NonZeroU32::new(100).unwrap(),
|
||||
late_arrive_window_seconds: NonZeroU32::new(100).unwrap(),
|
||||
..Default::default()
|
||||
};
|
||||
let db = TestDb::new(rules, vec![]);
|
||||
|
@ -1320,7 +1320,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_compact() {
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: NonZeroU32::new(10).unwrap(),
|
||||
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
|
||||
persist_row_threshold: NonZeroUsize::new(1_000).unwrap(),
|
||||
..Default::default()
|
||||
};
|
||||
|
@ -1411,7 +1411,6 @@ mod tests {
|
|||
#[test]
|
||||
fn test_persist() {
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: NonZeroU32::new(10).unwrap(),
|
||||
persist: true,
|
||||
persist_row_threshold: NonZeroUsize::new(1_000).unwrap(),
|
||||
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
|
||||
|
@ -1439,7 +1438,7 @@ mod tests {
|
|||
.with_persistence(1_000, now, from_secs(20)),
|
||||
// Writes too old => persist
|
||||
TestPartition::new(vec![
|
||||
// Should split open chunks irrespective of mutable_linger_seconds
|
||||
// Should split open chunks
|
||||
TestChunk::new(4, Some(0), Some(20), ChunkStorage::OpenMutableBuffer)
|
||||
.with_min_timestamp(from_secs(10)),
|
||||
TestChunk::new(5, Some(0), Some(0), ChunkStorage::ReadBuffer)
|
||||
|
@ -1508,7 +1507,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_moves_open() {
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: NonZeroU32::new(10).unwrap(),
|
||||
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
|
||||
..Default::default()
|
||||
};
|
||||
let chunks = vec![TestChunk::new(
|
||||
|
@ -1528,7 +1527,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_moves_closed() {
|
||||
let rules = LifecycleRules {
|
||||
mutable_linger_seconds: NonZeroU32::new(10).unwrap(),
|
||||
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
|
||||
..Default::default()
|
||||
};
|
||||
let chunks = vec![TestChunk::new(
|
||||
|
|
|
@ -74,15 +74,6 @@ pub struct Config {
|
|||
struct Create {
|
||||
/// The name of the database
|
||||
name: String,
|
||||
|
||||
/// A chunk of data within a partition that has been cold for writes for
|
||||
/// this many seconds will be frozen and compacted (moved to the read
|
||||
/// buffer) if the chunk is older than mutable_min_lifetime_seconds
|
||||
///
|
||||
/// Represents the chunk transition open -> moving and closed -> moving
|
||||
#[structopt(long, default_value = "300")] // 5 minutes
|
||||
mutable_linger_seconds: u32,
|
||||
|
||||
/// Once the total amount of buffered data in memory reaches this size start
|
||||
/// dropping data from memory based on the drop_order
|
||||
#[structopt(long, default_value = "52428800")] // 52428800 = 50*1024*1024
|
||||
|
@ -110,7 +101,9 @@ struct Create {
|
|||
#[structopt(long, default_value = "100", parse(try_from_str))]
|
||||
catalog_transactions_until_checkpoint: NonZeroU64,
|
||||
|
||||
/// The average timestamp skew across concurrent writers
|
||||
/// Writers will generally have this amount of time to send late arriving writes
|
||||
/// or this could be their clock skew. Once a partition hasn't recieved a write
|
||||
/// for this period of time, it will be compacted and, if set, persisted.
|
||||
#[structopt(long, default_value = "300")]
|
||||
late_arrive_window_seconds: u32,
|
||||
|
||||
|
@ -180,7 +173,6 @@ pub async fn command(url: String, config: Config) -> Result<()> {
|
|||
let rules = DatabaseRules {
|
||||
name: command.name,
|
||||
lifecycle_rules: Some(LifecycleRules {
|
||||
mutable_linger_seconds: command.mutable_linger_seconds,
|
||||
buffer_size_soft: command.buffer_size_soft as _,
|
||||
buffer_size_hard: command.buffer_size_hard as _,
|
||||
drop_non_persisted: command.drop_non_persisted,
|
||||
|
|
|
@ -14,7 +14,6 @@ use super::scenario::{
|
|||
create_readable_database, create_two_partition_database, create_unreadable_database, rand_name,
|
||||
};
|
||||
use crate::common::server_fixture::ServerFixture;
|
||||
use data_types::database_rules::DEFAULT_MUTABLE_LINGER_SECONDS;
|
||||
use std::time::Instant;
|
||||
use tonic::Code;
|
||||
|
||||
|
@ -244,11 +243,6 @@ async fn test_create_get_update_database() {
|
|||
ignore_errors: true,
|
||||
..Default::default()
|
||||
}));
|
||||
rules.lifecycle_rules = Some(LifecycleRules {
|
||||
mutable_linger_seconds: DEFAULT_MUTABLE_LINGER_SECONDS,
|
||||
..rules.lifecycle_rules.unwrap()
|
||||
});
|
||||
|
||||
let updated_rules = client
|
||||
.update_database(rules.clone())
|
||||
.await
|
||||
|
@ -731,7 +725,7 @@ async fn test_chunk_lifecycle() {
|
|||
.create_database(DatabaseRules {
|
||||
name: db_name.clone(),
|
||||
lifecycle_rules: Some(LifecycleRules {
|
||||
mutable_linger_seconds: 1,
|
||||
late_arrive_window_seconds: 1,
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
|
|
|
@ -347,7 +347,6 @@ pub async fn create_quickly_persisting_database(
|
|||
}],
|
||||
}),
|
||||
lifecycle_rules: Some(LifecycleRules {
|
||||
mutable_linger_seconds: 1,
|
||||
buffer_size_soft: 512 * 1024, // 512K
|
||||
buffer_size_hard: 10 * 1024 * 1024, // 10MB
|
||||
persist: true,
|
||||
|
|
Loading…
Reference in New Issue