diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs index 86b4604a6e..7d5a077bb1 100644 --- a/data_types/src/database_rules.rs +++ b/data_types/src/database_rules.rs @@ -108,7 +108,8 @@ impl Partitioner for DatabaseRules { pub const DEFAULT_WORKER_BACKOFF_MILLIS: u64 = 1_000; pub const DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT: u64 = 100; -pub const DEFAULT_PERSIST_ROW_THRESHOLD: usize = 100_000; +pub const DEFAULT_MUB_ROW_THRESHOLD: usize = 100_000; +pub const DEFAULT_PERSIST_ROW_THRESHOLD: usize = 1_000_000; pub const DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS: u32 = 30 * 60; pub const DEFAULT_LATE_ARRIVE_WINDOW_SECONDS: u32 = 5 * 60; diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index 7e33642551..374c0e8721 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -6,7 +6,7 @@ use chrono::{DateTime, Utc}; use futures::future::BoxFuture; use data_types::chunk_metadata::ChunkStorage; -use data_types::database_rules::LifecycleRules; +use data_types::database_rules::{LifecycleRules, DEFAULT_MUB_ROW_THRESHOLD}; use observability_deps::tracing::{debug, info, warn}; use tracker::TaskTracker; @@ -449,6 +449,10 @@ fn elapsed_seconds(a: DateTime, b: DateTime) -> u32 { /// /// Note: Does not check the chunk is the correct state fn can_move(rules: &LifecycleRules, chunk: &C, now: DateTime) -> bool { + if chunk.row_count() >= DEFAULT_MUB_ROW_THRESHOLD { + return true; + } + match (rules.mutable_linger_seconds, chunk.time_of_last_write()) { (Some(linger), Some(last_write)) if elapsed_seconds(now, last_write) >= linger.get() => { match ( @@ -879,6 +883,11 @@ mod tests { let chunk = TestChunk::new(0, Some(0), Some(70), ChunkStorage::OpenMutableBuffer); assert!(!can_move(&rules, &chunk, from_secs(71))); assert!(can_move(&rules, &chunk, from_secs(81))); + + // If over the default row count threshold, we should be able to move + let chunk = TestChunk::new(0, None, None, ChunkStorage::OpenMutableBuffer) + .with_row_count(DEFAULT_MUB_ROW_THRESHOLD); + assert!(can_move(&rules, &chunk, from_secs(0))); } #[test] diff --git a/server/src/db.rs b/server/src/db.rs index 18adfbf2ab..f848295aea 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -43,7 +43,6 @@ use snafu::{ensure, ResultExt, Snafu}; use std::{ any::Any, collections::HashMap, - num::NonZeroUsize, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -599,7 +598,6 @@ impl Db { pub fn store_sequenced_entry(&self, sequenced_entry: Arc) -> Result<()> { // Get all needed database rule values, then release the lock let rules = self.rules.read(); - let mutable_size_threshold = rules.lifecycle_rules.mutable_size_threshold; let immutable = rules.lifecycle_rules.immutable; let buffer_size_hard = rules.lifecycle_rules.buffer_size_hard; let late_arrival_window = rules.lifecycle_rules.late_arrive_window(); @@ -665,8 +663,6 @@ impl Db { } continue; }; - - check_chunk_closed(&mut *chunk, mutable_size_threshold); } None => { let metrics = self.metrics_registry.register_domain_with_labels( @@ -691,8 +687,7 @@ impl Db { continue; } - let new_chunk = partition.create_open_chunk(mb_chunk); - check_chunk_closed(&mut *new_chunk.write(), mutable_size_threshold); + partition.create_open_chunk(mb_chunk); } }; @@ -728,19 +723,6 @@ impl Db { } } -/// Check if the given chunk should be closed based on the the MutableBuffer size threshold. -fn check_chunk_closed(chunk: &mut CatalogChunk, mutable_size_threshold: Option) { - if let Some(threshold) = mutable_size_threshold { - if let Ok(mb_chunk) = chunk.mutable_buffer() { - let size = mb_chunk.size(); - - if size > threshold.get() { - chunk.freeze().expect("cannot close open chunk"); - } - } - } -} - #[async_trait] /// Convenience implementation of `Database` so the rest of the code /// can just use Db as a `Database` even though the implementation @@ -866,7 +848,7 @@ mod tests { use crate::{ db::{ - catalog::chunk::{ChunkStage, ChunkStageFrozenRepr}, + catalog::chunk::ChunkStage, test_helpers::{try_write_lp, write_lp}, }, utils::{make_db, TestDb}, @@ -1824,40 +1806,6 @@ mod tests { assert!(chunk.time_closed().unwrap() < after_rollover); } - #[tokio::test] - async fn test_chunk_closing() { - let db = Arc::new(make_db().await.db); - db.rules.write().lifecycle_rules.mutable_size_threshold = - Some(NonZeroUsize::new(2).unwrap()); - - write_lp(&db, "cpu bar=1 10").await; - write_lp(&db, "cpu bar=1 20").await; - - let partitions = db.catalog.partition_keys(); - assert_eq!(partitions.len(), 1); - let partition_key = partitions.into_iter().next().unwrap(); - - let partition = db.catalog.partition("cpu", &partition_key).unwrap(); - let partition = partition.read(); - - let chunks: Vec<_> = partition.chunks().collect(); - assert_eq!(chunks.len(), 2); - assert!(matches!( - chunks[0].read().stage(), - ChunkStage::Frozen { - representation: ChunkStageFrozenRepr::MutableBufferSnapshot(_), - .. - } - )); - assert!(matches!( - chunks[1].read().stage(), - ChunkStage::Frozen { - representation: ChunkStageFrozenRepr::MutableBufferSnapshot(_), - .. - } - )); - } - #[tokio::test] async fn chunk_id_listing() { // Test that chunk id listing is hooked up @@ -2446,7 +2394,7 @@ mod tests { ("access", "exclusive"), ]) .counter() - .eq(2.) + .eq(1.) .unwrap(); test_db