Merge pull request #1875 from influxdata/pd-remove-mb-size-limit-checks
feat: remove MUB size thresholdpull/24376/head
commit
8386b1528e
|
@ -108,7 +108,8 @@ impl Partitioner for DatabaseRules {
|
|||
|
||||
pub const DEFAULT_WORKER_BACKOFF_MILLIS: u64 = 1_000;
|
||||
pub const DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT: u64 = 100;
|
||||
pub const DEFAULT_PERSIST_ROW_THRESHOLD: usize = 100_000;
|
||||
pub const DEFAULT_MUB_ROW_THRESHOLD: usize = 100_000;
|
||||
pub const DEFAULT_PERSIST_ROW_THRESHOLD: usize = 1_000_000;
|
||||
pub const DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS: u32 = 30 * 60;
|
||||
pub const DEFAULT_LATE_ARRIVE_WINDOW_SECONDS: u32 = 5 * 60;
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ use chrono::{DateTime, Utc};
|
|||
use futures::future::BoxFuture;
|
||||
|
||||
use data_types::chunk_metadata::ChunkStorage;
|
||||
use data_types::database_rules::LifecycleRules;
|
||||
use data_types::database_rules::{LifecycleRules, DEFAULT_MUB_ROW_THRESHOLD};
|
||||
use observability_deps::tracing::{debug, info, warn};
|
||||
use tracker::TaskTracker;
|
||||
|
||||
|
@ -449,6 +449,10 @@ fn elapsed_seconds(a: DateTime<Utc>, b: DateTime<Utc>) -> u32 {
|
|||
///
|
||||
/// Note: Does not check the chunk is the correct state
|
||||
fn can_move<C: LifecycleChunk>(rules: &LifecycleRules, chunk: &C, now: DateTime<Utc>) -> bool {
|
||||
if chunk.row_count() >= DEFAULT_MUB_ROW_THRESHOLD {
|
||||
return true;
|
||||
}
|
||||
|
||||
match (rules.mutable_linger_seconds, chunk.time_of_last_write()) {
|
||||
(Some(linger), Some(last_write)) if elapsed_seconds(now, last_write) >= linger.get() => {
|
||||
match (
|
||||
|
@ -879,6 +883,11 @@ mod tests {
|
|||
let chunk = TestChunk::new(0, Some(0), Some(70), ChunkStorage::OpenMutableBuffer);
|
||||
assert!(!can_move(&rules, &chunk, from_secs(71)));
|
||||
assert!(can_move(&rules, &chunk, from_secs(81)));
|
||||
|
||||
// If over the default row count threshold, we should be able to move
|
||||
let chunk = TestChunk::new(0, None, None, ChunkStorage::OpenMutableBuffer)
|
||||
.with_row_count(DEFAULT_MUB_ROW_THRESHOLD);
|
||||
assert!(can_move(&rules, &chunk, from_secs(0)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -43,7 +43,6 @@ use snafu::{ensure, ResultExt, Snafu};
|
|||
use std::{
|
||||
any::Any,
|
||||
collections::HashMap,
|
||||
num::NonZeroUsize,
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
|
@ -599,7 +598,6 @@ impl Db {
|
|||
pub fn store_sequenced_entry(&self, sequenced_entry: Arc<SequencedEntry>) -> Result<()> {
|
||||
// Get all needed database rule values, then release the lock
|
||||
let rules = self.rules.read();
|
||||
let mutable_size_threshold = rules.lifecycle_rules.mutable_size_threshold;
|
||||
let immutable = rules.lifecycle_rules.immutable;
|
||||
let buffer_size_hard = rules.lifecycle_rules.buffer_size_hard;
|
||||
let late_arrival_window = rules.lifecycle_rules.late_arrive_window();
|
||||
|
@ -665,8 +663,6 @@ impl Db {
|
|||
}
|
||||
continue;
|
||||
};
|
||||
|
||||
check_chunk_closed(&mut *chunk, mutable_size_threshold);
|
||||
}
|
||||
None => {
|
||||
let metrics = self.metrics_registry.register_domain_with_labels(
|
||||
|
@ -691,8 +687,7 @@ impl Db {
|
|||
continue;
|
||||
}
|
||||
|
||||
let new_chunk = partition.create_open_chunk(mb_chunk);
|
||||
check_chunk_closed(&mut *new_chunk.write(), mutable_size_threshold);
|
||||
partition.create_open_chunk(mb_chunk);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -728,19 +723,6 @@ impl Db {
|
|||
}
|
||||
}
|
||||
|
||||
/// Check if the given chunk should be closed based on the the MutableBuffer size threshold.
|
||||
fn check_chunk_closed(chunk: &mut CatalogChunk, mutable_size_threshold: Option<NonZeroUsize>) {
|
||||
if let Some(threshold) = mutable_size_threshold {
|
||||
if let Ok(mb_chunk) = chunk.mutable_buffer() {
|
||||
let size = mb_chunk.size();
|
||||
|
||||
if size > threshold.get() {
|
||||
chunk.freeze().expect("cannot close open chunk");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
/// Convenience implementation of `Database` so the rest of the code
|
||||
/// can just use Db as a `Database` even though the implementation
|
||||
|
@ -866,7 +848,7 @@ mod tests {
|
|||
|
||||
use crate::{
|
||||
db::{
|
||||
catalog::chunk::{ChunkStage, ChunkStageFrozenRepr},
|
||||
catalog::chunk::ChunkStage,
|
||||
test_helpers::{try_write_lp, write_lp},
|
||||
},
|
||||
utils::{make_db, TestDb},
|
||||
|
@ -1824,40 +1806,6 @@ mod tests {
|
|||
assert!(chunk.time_closed().unwrap() < after_rollover);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_chunk_closing() {
|
||||
let db = Arc::new(make_db().await.db);
|
||||
db.rules.write().lifecycle_rules.mutable_size_threshold =
|
||||
Some(NonZeroUsize::new(2).unwrap());
|
||||
|
||||
write_lp(&db, "cpu bar=1 10").await;
|
||||
write_lp(&db, "cpu bar=1 20").await;
|
||||
|
||||
let partitions = db.catalog.partition_keys();
|
||||
assert_eq!(partitions.len(), 1);
|
||||
let partition_key = partitions.into_iter().next().unwrap();
|
||||
|
||||
let partition = db.catalog.partition("cpu", &partition_key).unwrap();
|
||||
let partition = partition.read();
|
||||
|
||||
let chunks: Vec<_> = partition.chunks().collect();
|
||||
assert_eq!(chunks.len(), 2);
|
||||
assert!(matches!(
|
||||
chunks[0].read().stage(),
|
||||
ChunkStage::Frozen {
|
||||
representation: ChunkStageFrozenRepr::MutableBufferSnapshot(_),
|
||||
..
|
||||
}
|
||||
));
|
||||
assert!(matches!(
|
||||
chunks[1].read().stage(),
|
||||
ChunkStage::Frozen {
|
||||
representation: ChunkStageFrozenRepr::MutableBufferSnapshot(_),
|
||||
..
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn chunk_id_listing() {
|
||||
// Test that chunk id listing is hooked up
|
||||
|
@ -2446,7 +2394,7 @@ mod tests {
|
|||
("access", "exclusive"),
|
||||
])
|
||||
.counter()
|
||||
.eq(2.)
|
||||
.eq(1.)
|
||||
.unwrap();
|
||||
|
||||
test_db
|
||||
|
|
Loading…
Reference in New Issue