feat: trigger persistence if over soft limit and no evictable chunks (#3791)

* feat: trigger persistence if over soft limit and no evictable chunks

* chore: fmt

* fix: avoid test_full_lifecycle exceeding soft limit

* fix: don't expect chunk to be unloaded

* feat: only trigger if no outstanding persist

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2022-02-21 10:14:27 +00:00 committed by GitHub
parent 271daa5eac
commit 39c42678d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 111 additions and 18 deletions

View File

@ -97,6 +97,8 @@ async fn test_full_lifecycle() {
// Only trigger persistence once we've finished writing
.persist_row_threshold(total_rows)
.persist_age_threshold_seconds(1000)
// Prevent persistence being triggered by soft limit
.buffer_size_soft(5 * 1024 * 1024) // 5 MB
// A low late arrival time to speed up the test
.late_arrive_window_seconds(1)
.build(fixture.grpc_channel())
@ -114,7 +116,7 @@ async fn test_full_lifecycle() {
wait_for_exact_chunk_states(
&fixture,
&db_name,
vec![ChunkStorage::ObjectStoreOnly],
vec![ChunkStorage::ReadBufferAndObjectStore],
std::time::Duration::from_secs(10),
)
.await;

View File

@ -75,17 +75,19 @@ where
/// - If persist is `true` it will only unload persisted chunks in order of creation time, starting with the oldest.
/// - If persist is `false` it will consider all chunks, also in order of creation time, starting with the oldest.
///
/// If cannot evict chunks from memory, returns the index of a partition to persist
///
fn maybe_free_memory<P: LockablePartition>(
&mut self,
db_name: &DatabaseName<'static>,
partitions: &[P],
soft_limit: usize,
persist: bool,
) {
) -> Option<usize> {
let buffer_size = self.db.buffer_size();
if buffer_size < soft_limit {
trace!(%db_name, buffer_size, %soft_limit, "memory use under soft limit");
return;
return None;
}
// Collect a list of candidates to free memory
@ -128,7 +130,7 @@ where
let buffer_size = self.db.buffer_size();
if buffer_size < soft_limit {
trace!(%db_name, buffer_size, %soft_limit, "memory use under soft limit");
break;
return None;
}
trace!(%db_name, buffer_size, %soft_limit, "memory use over soft limit");
@ -198,6 +200,28 @@ where
}
}
}
let has_outstanding_persistence_job = self
.trackers
.iter()
.any(|x| matches!(x.metadata(), ChunkLifecycleAction::Persisting));
if persist && !has_outstanding_persistence_job {
debug!(%db_name, "no chunks found that could be evicted, persisting largest partition");
let mut max_rows = 0_usize;
let mut candidate = None;
for (partition_idx, partition) in partitions.iter().enumerate() {
let guard = partition.read();
let persistable_rows = guard.persistable_row_count();
if persistable_rows > max_rows {
candidate = Some(partition_idx);
max_rows = persistable_rows;
}
}
return candidate;
}
None
}
/// Find chunks to compact together
@ -306,14 +330,11 @@ where
/// all data that is not yet eligible for persistence (it was
/// written to recently)
///
/// A chunk will be chosen for the persist operation if either:
/// A partition will be persisted if:
///
/// 1. it has more than `persist_row_threshold` rows
/// 2. it was last written to more than `late_arrive_window_seconds` ago
///
/// Returns true if persistence is being blocked by compaction,
/// signaling compaction should be stalled to allow persistence to
/// make progress
/// 1. it has more than `persist_row_threshold` rows that can be persisted
/// 2. it contains writes that arrived more than `persist_age_threshold_seconds` ago
/// 3. `force` is true and there are rows to be persisted
///
/// Returns a boolean to indicate if it should stall compaction to allow
/// persistence to make progress
@ -335,6 +356,7 @@ where
partition: &P,
rules: &LifecycleRules,
now: Time,
force: bool,
) -> bool {
// TODO: Encapsulate locking into a CatalogTransaction type
let partition = partition.read();
@ -362,7 +384,9 @@ where
rules_persist_age_threshold_seconds=%rules.persist_age_threshold_seconds.get(),
"considering for persistence");
if persistable_row_count >= rules.persist_row_threshold.get() {
if force {
debug!(%db_name, %partition, "persisting partition as force set");
} else if persistable_row_count >= rules.persist_row_threshold.get() {
debug!(%db_name, %partition, persistable_row_count, "persisting partition as exceeds row threshold");
} else if persistable_age_seconds >= rules.persist_age_threshold_seconds.get() as u64 {
debug!(%db_name, %partition, persistable_age_seconds, "persisting partition as exceeds age threshold");
@ -446,7 +470,11 @@ where
let rules = self.db.rules();
let partitions = self.db.partitions();
for partition in &partitions {
let force_persist_partition_idx = rules.buffer_size_soft.and_then(|soft_limit| {
self.maybe_free_memory(&db_name, &partitions, soft_limit.get(), rules.persist)
});
for (partition_idx, partition) in partitions.iter().enumerate() {
self.maybe_cleanup_failed(&db_name, partition, now);
// Persistence cannot split chunks if they are currently being compacted
@ -457,7 +485,11 @@ where
// but persistence cannot proceed because of in-progress
// compactions
let stall_compaction_persisting = if rules.persist && !self.suppress_persistence {
let persisting = self.maybe_persist_chunks(&db_name, partition, &rules, now);
let force = force_persist_partition_idx
.map(|x| x == partition_idx)
.unwrap_or(false);
let persisting = self.maybe_persist_chunks(&db_name, partition, &rules, now, force);
if persisting {
debug!(%db_name, partition=%partition.read(), reason="persisting", "stalling compaction");
}
@ -490,10 +522,6 @@ where
self.maybe_compact_chunks(partition, &rules, now);
}
if let Some(soft_limit) = rules.buffer_size_soft {
self.maybe_free_memory(&db_name, &partitions, soft_limit.get(), rules.persist)
}
// Clear out completed tasks
let mut completed_compactions = 0;
self.trackers.retain(|x| {
@ -1870,6 +1898,69 @@ mod tests {
);
}
#[test]
fn test_persist_soft_limit() {
// test that lifecycle will trigger persistence if cannot free chunks
let rules = LifecycleRules {
buffer_size_soft: Some(NonZeroUsize::new(5).unwrap()),
persist: true,
..Default::default()
};
// Should persist the partition with the most persistable rows
let partitions = vec![
TestPartition::new(vec![
TestChunk::new(ChunkId::new_test(0), 0, ChunkStorage::ObjectStoreOnly),
TestChunk::new(ChunkId::new_test(1), 0, ChunkStorage::OpenMutableBuffer),
])
.with_persistence(50, from_secs(0), from_secs(0)),
TestPartition::new(vec![
TestChunk::new(ChunkId::new_test(2), 0, ChunkStorage::ObjectStoreOnly),
// Should persist this chunk to free memory
TestChunk::new(ChunkId::new_test(3), 0, ChunkStorage::OpenMutableBuffer)
.with_min_timestamp(from_secs(0)),
// Should skip this chunk as only persistable up to 5 seconds
TestChunk::new(ChunkId::new_test(4), 0, ChunkStorage::OpenMutableBuffer)
.with_min_timestamp(from_secs(6)),
])
.with_persistence(100, from_secs(0), from_secs(5)),
];
let (db, _) = test_db_partitions(rules.clone(), partitions, from_secs(0));
let mut lifecycle = LifecyclePolicy::new(&db);
lifecycle.check_for_work();
assert_eq!(
*db.events.read(),
vec![MoverEvents::Persist(vec![ChunkId::new_test(3)])]
);
// Shouldn't trigger persistence if already persisting
let partitions = vec![TestPartition::new(vec![TestChunk::new(
ChunkId::new_test(0),
0,
ChunkStorage::ReadBuffer,
)
.with_min_timestamp(from_secs(0))])
.with_persistence(100, from_secs(0), from_secs(0))];
let (db, time_provider) = test_db_partitions(rules, partitions, from_secs(0));
let mut registry = TaskRegistry::new(time_provider);
let mut lifecycle = LifecyclePolicy::new(&db);
let (tracker, registration) = registry.register(ChunkLifecycleAction::Persisting);
// Manually add the tracker to the policy as if a previous invocation
// of check_for_work had started a background persist task
lifecycle.trackers.push(tracker);
lifecycle.check_for_work();
assert_eq!(*db.events.read(), vec![]);
// Dropping registration will "complete" tracker
std::mem::drop(registration);
}
#[test]
fn test_persist_empty() {
let rules = LifecycleRules {