feat: add db-level compaction limiter

pull/24376/head
Edd Robinson 2021-07-19 12:30:46 +01:00
parent 4ca19ad13f
commit dd93b2cdec
3 changed files with 109 additions and 7 deletions

View File

@ -118,6 +118,7 @@ pub const DEFAULT_MUB_ROW_THRESHOLD: usize = 100_000;
pub const DEFAULT_PERSIST_ROW_THRESHOLD: usize = 1_000_000;
pub const DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS: u32 = 30 * 60;
pub const DEFAULT_LATE_ARRIVE_WINDOW_SECONDS: u32 = 5 * 60;
pub const DEFAULT_MAX_ACTIVE_COMPACTIONS: u32 = 14;
/// Configures how data automatically flows through the system
#[derive(Debug, Eq, PartialEq, Clone)]
@ -144,6 +145,11 @@ pub struct LifecycleRules {
/// will sleep for this many milliseconds before looking again
pub worker_backoff_millis: NonZeroU64,
/// The maximum number of permitted concurrently executing compactions.
/// It is not currently possible to set a limit that disables compactions
/// entirely, nor is it possible to set an "unlimited" value.
pub max_active_compactions: NonZeroU32,
/// After how many transactions should IOx write a new checkpoint?
pub catalog_transactions_until_checkpoint: NonZeroU64,
@ -179,6 +185,7 @@ impl Default for LifecycleRules {
persist: false,
immutable: false,
worker_backoff_millis: NonZeroU64::new(DEFAULT_WORKER_BACKOFF_MILLIS).unwrap(),
max_active_compactions: NonZeroU32::new(DEFAULT_MAX_ACTIVE_COMPACTIONS).unwrap(),
catalog_transactions_until_checkpoint: NonZeroU64::new(
DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT,
)

View File

@ -30,6 +30,9 @@ where
/// The `LifecycleDb` this policy is automating
db: M,
/// The current number of active compactions.
active_compactions: usize,
/// Background tasks spawned by this `LifecyclePolicy`
trackers: Vec<TaskTracker<ChunkLifecycleAction>>,
}
@ -42,6 +45,7 @@ where
Self {
db,
trackers: vec![],
active_compactions: 0,
}
}
@ -250,6 +254,10 @@ where
}
if to_compact.len() >= 2 || has_mub_snapshot {
// caller's responsibility to determine if we can maybe compact.
assert!(self.active_compactions < rules.max_active_compactions.get() as usize);
// Upgrade partition first
let partition = partition.upgrade();
let chunks = to_compact
@ -261,6 +269,7 @@ where
.expect("failed to compact chunks")
.with_metadata(ChunkLifecycleAction::Compacting);
self.active_compactions += 1;
self.trackers.push(tracker);
}
}
@ -475,17 +484,39 @@ where
// if the criteria for persistence have been satisfied,
// but persistence cannot proceed because of in-progress
// compactions
let stall_compaction = if rules.persist {
self.maybe_persist_chunks(&db_name, partition, &rules, now_instant)
let stall_compaction_persisting = if rules.persist {
let persisting =
self.maybe_persist_chunks(&db_name, partition, &rules, now_instant);
if persisting {
debug!(%db_name, %partition, reason="persisting", "stalling compaction");
}
persisting
} else {
false
};
if !stall_compaction {
self.maybe_compact_chunks(partition, &rules, now);
} else {
debug!(%db_name, %partition, "stalling compaction to allow persist");
// Until we have a more sophisticated compaction policy that can
// allocate resources appropriately, we limit the number of
// compactions that may run concurrently. Compactions are
// completely disabled if max_compactions is Some(0), whilst if
// it is None then the compaction limiter is disabled (unlimited
// concurrent compactions).
let stall_compaction_no_slots = {
let max_compactions = self.db.rules().max_active_compactions.get();
let slots_full = self.active_compactions >= max_compactions as usize;
if slots_full {
debug!(%db_name, %partition, ?max_compactions, reason="slots_full", "stalling compaction");
}
slots_full
};
// conditions where no compactions will be scheduled.
if stall_compaction_persisting || stall_compaction_no_slots {
continue;
}
// possibly do a compaction
self.maybe_compact_chunks(partition, &rules, now);
}
if let Some(soft_limit) = rules.buffer_size_soft {
@ -498,7 +529,24 @@ where
}
// Clear out completed tasks
self.trackers.retain(|x| !x.is_complete());
let mut completed_compactions = 0;
self.trackers.retain(|x| {
if x.is_complete() && matches!(x.metadata(), ChunkLifecycleAction::Compacting) {
// free up slot for another compaction
completed_compactions += 1;
}
!x.is_complete()
});
// update active compactions
if completed_compactions > 0 {
debug!(?completed_compactions, active_compactions=?self.active_compactions,
max_compactions=?self.db.rules().max_active_compactions, "releasing compaction slots")
}
assert!(completed_compactions <= self.active_compactions);
self.active_compactions -= completed_compactions;
let tracker_fut = match self.trackers.is_empty() {
false => futures::future::Either::Left(futures::future::select_all(
@ -1437,6 +1485,52 @@ mod tests {
assert_eq!(*db.events.read(), vec![MoverEvents::Compact(vec![17, 18])]);
}
#[test]
fn test_compaction_limiter() {
let rules = LifecycleRules {
max_active_compactions: 2.try_into().unwrap(),
..Default::default()
};
let now = from_secs(50);
let partitions = vec![
TestPartition::new(vec![
// closed => can compact
TestChunk::new(0, Some(0), Some(20), ChunkStorage::ClosedMutableBuffer),
// closed => can compact
TestChunk::new(10, Some(0), Some(30), ChunkStorage::ClosedMutableBuffer),
// closed => can compact
TestChunk::new(12, Some(0), Some(40), ChunkStorage::ClosedMutableBuffer),
]),
TestPartition::new(vec![
// closed => can compact
TestChunk::new(1, Some(0), Some(20), ChunkStorage::ClosedMutableBuffer),
]),
TestPartition::new(vec![
// closed => can compact
TestChunk::new(200, Some(0), Some(10), ChunkStorage::ClosedMutableBuffer),
]),
];
let db = TestDb::from_partitions(rules, partitions);
let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work(now, Instant::now());
assert_eq!(
*db.events.read(),
vec![
MoverEvents::Compact(vec![0, 10, 12]),
MoverEvents::Compact(vec![1]),
],
);
db.events.write().clear();
// Compaction slots freed up, other partition can now compact.
lifecycle.check_for_work(now, Instant::now());
assert_eq!(*db.events.read(), vec![MoverEvents::Compact(vec![200]),],);
}
#[test]
fn test_persist() {
let rules = LifecycleRules {

View File

@ -185,6 +185,7 @@ pub async fn command(url: String, config: Config) -> Result<()> {
persist: command.persist,
immutable: command.immutable,
worker_backoff_millis: Default::default(),
max_active_compactions: Default::default(),
catalog_transactions_until_checkpoint: command
.catalog_transactions_until_checkpoint
.get(),