feat: easy way to suppress persitence from lifecycle policy

pull/24376/head
Marco Neumann 2021-08-02 12:58:33 +02:00
parent 98d4c9fca1
commit d1a4584dfc
1 changed files with 74 additions and 1 deletions

View File

@ -37,20 +37,44 @@ where
/// Background tasks spawned by this `LifecyclePolicy`
trackers: Vec<TaskTracker<ChunkLifecycleAction>>,
/// Do not allow persistence even when the database rules would allow that.
///
/// This can be helpful during some phases of the database startup process.
suppress_persistence: bool,
}
impl<M> LifecyclePolicy<M>
where
M: LifecycleDb,
{
/// Create new policy.
///
/// Persistence is allowed if the database rules allow it.
pub fn new(db: M) -> Self {
Self {
db,
trackers: vec![],
active_compactions: 0,
suppress_persistence: false,
}
}
/// Create new policy that suppresses persistence even when the database rules allow it.
pub fn new_suppress_persistence(db: M) -> Self {
Self {
db,
trackers: vec![],
active_compactions: 0,
suppress_persistence: true,
}
}
/// Stop suppressing persistence and allow it if the database rules allow it.
pub fn unsuppress_persistence(&mut self) {
self.suppress_persistence = false;
}
/// Check if database exceeds memory limits and free memory if necessary
///
/// The policy will first try to unload persisted chunks in order of creation
@ -485,7 +509,7 @@ where
// if the criteria for persistence have been satisfied,
// but persistence cannot proceed because of in-progress
// compactions
let stall_compaction_persisting = if rules.persist {
let stall_compaction_persisting = if rules.persist && !self.suppress_persistence {
let persisting =
self.maybe_persist_chunks(&db_name, partition, &rules, now_instant);
if persisting {
@ -834,6 +858,12 @@ mod tests {
for chunk in &chunks {
partition.chunks.remove(&chunk.addr.chunk_id);
new_chunk.row_count += chunk.row_count;
new_chunk.min_timestamp = match (new_chunk.min_timestamp, chunk.min_timestamp) {
(Some(ts1), Some(ts2)) => Some(ts1.min(ts2)),
(Some(ts), None) => Some(ts),
(None, Some(ts)) => Some(ts),
(None, None) => None,
};
}
partition
@ -1585,6 +1615,49 @@ mod tests {
);
}
#[test]
fn test_suppress_persistence() {
let rules = LifecycleRules {
persist: true,
persist_row_threshold: NonZeroUsize::new(1_000).unwrap(),
late_arrive_window_seconds: NonZeroU32::new(10).unwrap(),
persist_age_threshold_seconds: NonZeroU32::new(10).unwrap(),
max_active_compactions: MaxActiveCompactions(NonZeroU32::new(10).unwrap()),
..Default::default()
};
let now = Instant::now();
let partitions = vec![
// Sufficient rows => could persist but should be suppressed
TestPartition::new(vec![
TestChunk::new(2, 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)),
TestChunk::new(3, 0, ChunkStorage::ReadBuffer).with_min_timestamp(from_secs(5)),
])
.with_persistence(1_000, now, from_secs(20)),
];
let db = TestDb::from_partitions(rules, partitions);
let mut lifecycle = LifecyclePolicy::new_suppress_persistence(&db);
lifecycle.check_for_work(from_secs(0), now);
assert_eq!(*db.events.read(), vec![MoverEvents::Compact(vec![2, 3]),]);
lifecycle.check_for_work(from_secs(0), now);
assert_eq!(*db.events.read(), vec![MoverEvents::Compact(vec![2, 3]),]);
lifecycle.unsuppress_persistence();
lifecycle.check_for_work(from_secs(0), now);
assert_eq!(
*db.events.read(),
vec![
MoverEvents::Compact(vec![2, 3]),
MoverEvents::Persist(vec![4])
]
);
}
#[test]
fn test_moves_open() {
let rules = LifecycleRules {