From 91f5478012c3ffc935dcb68d07cbe177e39470f0 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Thu, 1 Jul 2021 14:58:29 -0400 Subject: [PATCH 1/2] feat: remove MUB size threshold Removes the MUB chunk close based on size. Also add a check in lifecycle policy to move if the MUB chunk crosses a default row count threshold. --- data_types/src/database_rules.rs | 3 +- lifecycle/src/policy.rs | 8 ++--- server/src/db.rs | 56 ++------------------------------ 3 files changed, 8 insertions(+), 59 deletions(-) diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs index 86b4604a6e..7d5a077bb1 100644 --- a/data_types/src/database_rules.rs +++ b/data_types/src/database_rules.rs @@ -108,7 +108,8 @@ impl Partitioner for DatabaseRules { pub const DEFAULT_WORKER_BACKOFF_MILLIS: u64 = 1_000; pub const DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT: u64 = 100; -pub const DEFAULT_PERSIST_ROW_THRESHOLD: usize = 100_000; +pub const DEFAULT_MUB_ROW_THRESHOLD: usize = 100_000; +pub const DEFAULT_PERSIST_ROW_THRESHOLD: usize = 1_000_000; pub const DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS: u32 = 30 * 60; pub const DEFAULT_LATE_ARRIVE_WINDOW_SECONDS: u32 = 5 * 60; diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index 7f196d7942..79bf008772 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -6,7 +6,7 @@ use chrono::{DateTime, Utc}; use futures::future::BoxFuture; use data_types::chunk_metadata::ChunkStorage; -use data_types::database_rules::LifecycleRules; +use data_types::database_rules::{LifecycleRules, DEFAULT_MUB_ROW_THRESHOLD}; use observability_deps::tracing::{debug, info, warn}; use tracker::TaskTracker; @@ -465,9 +465,9 @@ fn can_move(rules: &LifecycleRules, chunk: &C, now: DateTime< } } - // Disable movement if no mutable_linger set, - // or the chunk is empty, or the linger hasn't expired - _ => false, + // Move it if over the row count threshold or don't move since it's either empty or the linger time hasn't expired. + // TODO: make this a configuration variable + _ => chunk.row_count() >= DEFAULT_MUB_ROW_THRESHOLD, } } diff --git a/server/src/db.rs b/server/src/db.rs index c0ce21d685..4f3d2313ce 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -42,7 +42,6 @@ use snafu::{ensure, ResultExt, Snafu}; use std::{ any::Any, collections::HashMap, - num::NonZeroUsize, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -595,7 +594,6 @@ impl Db { pub fn store_sequenced_entry(&self, sequenced_entry: Arc) -> Result<()> { // Get all needed database rule values, then release the lock let rules = self.rules.read(); - let mutable_size_threshold = rules.lifecycle_rules.mutable_size_threshold; let immutable = rules.lifecycle_rules.immutable; let buffer_size_hard = rules.lifecycle_rules.buffer_size_hard; let late_arrival_window = rules.lifecycle_rules.late_arrive_window(); @@ -661,8 +659,6 @@ impl Db { } continue; }; - - check_chunk_closed(&mut *chunk, mutable_size_threshold); } None => { let metrics = self.metrics_registry.register_domain_with_labels( @@ -687,8 +683,7 @@ impl Db { continue; } - let new_chunk = partition.create_open_chunk(mb_chunk); - check_chunk_closed(&mut *new_chunk.write(), mutable_size_threshold); + let _ = partition.create_open_chunk(mb_chunk); } }; @@ -724,19 +719,6 @@ impl Db { } } -/// Check if the given chunk should be closed based on the the MutableBuffer size threshold. -fn check_chunk_closed(chunk: &mut CatalogChunk, mutable_size_threshold: Option) { - if let Some(threshold) = mutable_size_threshold { - if let Ok(mb_chunk) = chunk.mutable_buffer() { - let size = mb_chunk.size(); - - if size > threshold.get() { - chunk.freeze().expect("cannot close open chunk"); - } - } - } -} - #[async_trait] /// Convenience implementation of `Database` so the rest of the code /// can just use Db as a `Database` even though the implementation @@ -862,7 +844,7 @@ mod tests { use crate::{ db::{ - catalog::chunk::{ChunkStage, ChunkStageFrozenRepr}, + catalog::chunk::ChunkStage, test_helpers::{try_write_lp, write_lp}, }, utils::{make_db, TestDb}, @@ -1821,40 +1803,6 @@ mod tests { assert!(chunk.time_closed().unwrap() < after_rollover); } - #[tokio::test] - async fn test_chunk_closing() { - let db = Arc::new(make_db().await.db); - db.rules.write().lifecycle_rules.mutable_size_threshold = - Some(NonZeroUsize::new(2).unwrap()); - - write_lp(&db, "cpu bar=1 10").await; - write_lp(&db, "cpu bar=1 20").await; - - let partitions = db.catalog.partition_keys(); - assert_eq!(partitions.len(), 1); - let partition_key = partitions.into_iter().next().unwrap(); - - let partition = db.catalog.partition("cpu", &partition_key).unwrap(); - let partition = partition.read(); - - let chunks: Vec<_> = partition.chunks().collect(); - assert_eq!(chunks.len(), 2); - assert!(matches!( - chunks[0].read().stage(), - ChunkStage::Frozen { - representation: ChunkStageFrozenRepr::MutableBufferSnapshot(_), - .. - } - )); - assert!(matches!( - chunks[1].read().stage(), - ChunkStage::Frozen { - representation: ChunkStageFrozenRepr::MutableBufferSnapshot(_), - .. - } - )); - } - #[tokio::test] async fn chunk_id_listing() { // Test that chunk id listing is hooked up From 61917c107f3ef0f6b7db8aa6dfae8fb409069d15 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Thu, 1 Jul 2021 15:45:56 -0400 Subject: [PATCH 2/2] chore: add test for can_move on row count --- lifecycle/src/policy.rs | 15 ++++++++++++--- server/src/db.rs | 4 ++-- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index 79bf008772..ad42acead5 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -449,6 +449,10 @@ fn elapsed_seconds(a: DateTime, b: DateTime) -> u32 { /// /// Note: Does not check the chunk is the correct state fn can_move(rules: &LifecycleRules, chunk: &C, now: DateTime) -> bool { + if chunk.row_count() >= DEFAULT_MUB_ROW_THRESHOLD { + return true; + } + match (rules.mutable_linger_seconds, chunk.time_of_last_write()) { (Some(linger), Some(last_write)) if elapsed_seconds(now, last_write) >= linger.get() => { match ( @@ -465,9 +469,9 @@ fn can_move(rules: &LifecycleRules, chunk: &C, now: DateTime< } } - // Move it if over the row count threshold or don't move since it's either empty or the linger time hasn't expired. - // TODO: make this a configuration variable - _ => chunk.row_count() >= DEFAULT_MUB_ROW_THRESHOLD, + // Disable movement if no mutable_linger set, + // or the chunk is empty, or the linger hasn't expired + _ => false, } } @@ -879,6 +883,11 @@ mod tests { let chunk = TestChunk::new(0, Some(0), Some(70), ChunkStorage::OpenMutableBuffer); assert!(!can_move(&rules, &chunk, from_secs(71))); assert!(can_move(&rules, &chunk, from_secs(81))); + + // If over the default row count threshold, we should be able to move + let chunk = TestChunk::new(0, None, None, ChunkStorage::OpenMutableBuffer) + .with_row_count(DEFAULT_MUB_ROW_THRESHOLD); + assert!(can_move(&rules, &chunk, from_secs(0))); } #[test] diff --git a/server/src/db.rs b/server/src/db.rs index 4f3d2313ce..76477cbf50 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -683,7 +683,7 @@ impl Db { continue; } - let _ = partition.create_open_chunk(mb_chunk); + partition.create_open_chunk(mb_chunk); } }; @@ -2391,7 +2391,7 @@ mod tests { ("access", "exclusive"), ]) .counter() - .eq(2.) + .eq(1.) .unwrap(); test_db