fix: account for partition memory until persisted
The ingester maintains a rough "total memory in use" counter it uses to try and limit the amount of memory the ingester is using overall. When a partition is persisted, this total memory usage value is adjusted to account for releasing the partition memory. Prior to this commit, the ordering was: * Writes increase the memory counter * maybe_persist() is called to trigger persistence * A partition is identified for persistence * Partition memory usage is released back to the total memory counter * Persistence starts This meant that the partitions in the process of being persisted were not accounted for in the ingester's total memory counter, and therefore we could significantly overrun the configured memory limit. After this commit, the ordering is: * Writes increase the memory counter * maybe_persist() is called to trigger persistence * A partition is identified for persistence * Persistence starts * Persistence completes * Partition memory usage is released back to the total memory counter This ensures persisting partitions are sill tracked in the total memory counter, causing pauses to correctly fire.pull/24376/head
parent
bd6c4659af
commit
9a79d16585
|
@ -180,10 +180,7 @@ struct LifecycleState {
|
|||
|
||||
impl LifecycleState {
|
||||
fn remove(&mut self, partition_id: &PartitionId) -> Option<PartitionLifecycleStats> {
|
||||
self.partition_stats.remove(partition_id).map(|stats| {
|
||||
self.total_bytes -= stats.bytes_written;
|
||||
stats
|
||||
})
|
||||
self.partition_stats.remove(partition_id)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -352,14 +349,25 @@ impl LifecycleManager {
|
|||
let persist_tasks: Vec<_> = to_persist
|
||||
.into_iter()
|
||||
.map(|s| {
|
||||
self.remove(s.partition_id);
|
||||
// Mark this partition as being persisted, and remember the
|
||||
// memory allocation it had accumulated.
|
||||
let partition_memory_usage = self
|
||||
.remove(s.partition_id)
|
||||
.map(|s| s.bytes_written)
|
||||
.unwrap_or_default();
|
||||
let persister = Arc::clone(persister);
|
||||
|
||||
let (_tracker, registration) = self.job_registry.register(Job::Persist {
|
||||
partition_id: s.partition_id,
|
||||
});
|
||||
|
||||
let state = Arc::clone(&self.state);
|
||||
tokio::task::spawn(async move {
|
||||
persister.persist(s.partition_id).await;
|
||||
// Now the data has been uploaded and the memory it was
|
||||
// using has been freed, released the memory capacity back
|
||||
// the ingester.
|
||||
state.lock().total_bytes -= partition_memory_usage;
|
||||
})
|
||||
.track(registration)
|
||||
})
|
||||
|
@ -459,6 +467,7 @@ mod tests {
|
|||
use iox_time::MockProvider;
|
||||
use metric::{Attributes, Registry};
|
||||
use std::collections::BTreeSet;
|
||||
use test_helpers::timeout::FutureTimeout;
|
||||
|
||||
#[derive(Default)]
|
||||
struct TestPersister {
|
||||
|
@ -537,8 +546,8 @@ mod tests {
|
|||
assert_eq!(p2.first_write, Time::from_timestamp_nanos(10));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pausing_and_resuming_ingest() {
|
||||
#[tokio::test]
|
||||
async fn pausing_and_resuming_ingest() {
|
||||
let config = LifecycleConfig {
|
||||
pause_ingest_size: 20,
|
||||
persist_memory_threshold: 10,
|
||||
|
@ -546,29 +555,46 @@ mod tests {
|
|||
partition_age_threshold: Duration::from_nanos(0),
|
||||
partition_cold_threshold: Duration::from_secs(500),
|
||||
};
|
||||
let TestLifecycleManger { m, .. } = TestLifecycleManger::new(config);
|
||||
let TestLifecycleManger { mut m, .. } = TestLifecycleManger::new(config);
|
||||
let sequencer_id = SequencerId::new(1);
|
||||
let h = m.handle();
|
||||
|
||||
assert!(!h.log_write(
|
||||
PartitionId::new(1),
|
||||
sequencer_id,
|
||||
SequenceNumber::new(1),
|
||||
15
|
||||
));
|
||||
let partition_id = PartitionId::new(1);
|
||||
|
||||
assert!(!h.log_write(partition_id, sequencer_id, SequenceNumber::new(1), 15));
|
||||
|
||||
// now it should indicate a pause
|
||||
assert!(h.log_write(
|
||||
PartitionId::new(1),
|
||||
sequencer_id,
|
||||
SequenceNumber::new(2),
|
||||
10
|
||||
));
|
||||
assert!(h.log_write(partition_id, sequencer_id, SequenceNumber::new(2), 10));
|
||||
assert!(!h.can_resume_ingest());
|
||||
|
||||
m.remove(PartitionId::new(1));
|
||||
assert!(h.can_resume_ingest());
|
||||
assert!(!h.log_write(PartitionId::new(1), sequencer_id, SequenceNumber::new(3), 3));
|
||||
// Trigger a persistence job that succeeds.
|
||||
let persister = Arc::new(TestPersister::default());
|
||||
m.maybe_persist(&persister).await;
|
||||
|
||||
// Wait for the persist op to complete
|
||||
async {
|
||||
loop {
|
||||
if persister.persist_called_for(partition_id) {
|
||||
// Persist has completed, but the memory may be released next,
|
||||
// so loop looking for the ingest resumption signal.
|
||||
if h.can_resume_ingest() {
|
||||
// This is the expected case.
|
||||
//
|
||||
// Persistence has completed and the memory accounting has
|
||||
// been adjusted to reflect the freed partition data.
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
}
|
||||
.with_timeout_panic(Duration::from_secs(5))
|
||||
.await;
|
||||
|
||||
// Now the limit has dropped back down, writes should not return
|
||||
// pause=true
|
||||
assert!(!h.log_write(partition_id, sequencer_id, SequenceNumber::new(3), 3));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
Loading…
Reference in New Issue