From 9a79d165856de584258dcea21912780ee220dd02 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 23 Jun 2022 15:40:51 +0100 Subject: [PATCH] fix: account for partition memory until persisted The ingester maintains a rough "total memory in use" counter it uses to try and limit the amount of memory the ingester is using overall. When a partition is persisted, this total memory usage value is adjusted to account for releasing the partition memory. Prior to this commit, the ordering was: * Writes increase the memory counter * maybe_persist() is called to trigger persistence * A partition is identified for persistence * Partition memory usage is released back to the total memory counter * Persistence starts This meant that the partitions in the process of being persisted were not accounted for in the ingester's total memory counter, and therefore we could significantly overrun the configured memory limit. After this commit, the ordering is: * Writes increase the memory counter * maybe_persist() is called to trigger persistence * A partition is identified for persistence * Persistence starts * Persistence completes * Partition memory usage is released back to the total memory counter This ensures persisting partitions are sill tracked in the total memory counter, causing pauses to correctly fire. --- ingester/src/lifecycle.rs | 72 ++++++++++++++++++++++++++------------- 1 file changed, 49 insertions(+), 23 deletions(-) diff --git a/ingester/src/lifecycle.rs b/ingester/src/lifecycle.rs index 119d1c2f82..18f5ead6d9 100644 --- a/ingester/src/lifecycle.rs +++ b/ingester/src/lifecycle.rs @@ -180,10 +180,7 @@ struct LifecycleState { impl LifecycleState { fn remove(&mut self, partition_id: &PartitionId) -> Option { - self.partition_stats.remove(partition_id).map(|stats| { - self.total_bytes -= stats.bytes_written; - stats - }) + self.partition_stats.remove(partition_id) } } @@ -352,14 +349,25 @@ impl LifecycleManager { let persist_tasks: Vec<_> = to_persist .into_iter() .map(|s| { - self.remove(s.partition_id); + // Mark this partition as being persisted, and remember the + // memory allocation it had accumulated. + let partition_memory_usage = self + .remove(s.partition_id) + .map(|s| s.bytes_written) + .unwrap_or_default(); let persister = Arc::clone(persister); let (_tracker, registration) = self.job_registry.register(Job::Persist { partition_id: s.partition_id, }); + + let state = Arc::clone(&self.state); tokio::task::spawn(async move { persister.persist(s.partition_id).await; + // Now the data has been uploaded and the memory it was + // using has been freed, released the memory capacity back + // the ingester. + state.lock().total_bytes -= partition_memory_usage; }) .track(registration) }) @@ -459,6 +467,7 @@ mod tests { use iox_time::MockProvider; use metric::{Attributes, Registry}; use std::collections::BTreeSet; + use test_helpers::timeout::FutureTimeout; #[derive(Default)] struct TestPersister { @@ -537,8 +546,8 @@ mod tests { assert_eq!(p2.first_write, Time::from_timestamp_nanos(10)); } - #[test] - fn pausing_and_resuming_ingest() { + #[tokio::test] + async fn pausing_and_resuming_ingest() { let config = LifecycleConfig { pause_ingest_size: 20, persist_memory_threshold: 10, @@ -546,29 +555,46 @@ mod tests { partition_age_threshold: Duration::from_nanos(0), partition_cold_threshold: Duration::from_secs(500), }; - let TestLifecycleManger { m, .. } = TestLifecycleManger::new(config); + let TestLifecycleManger { mut m, .. } = TestLifecycleManger::new(config); let sequencer_id = SequencerId::new(1); let h = m.handle(); - assert!(!h.log_write( - PartitionId::new(1), - sequencer_id, - SequenceNumber::new(1), - 15 - )); + let partition_id = PartitionId::new(1); + + assert!(!h.log_write(partition_id, sequencer_id, SequenceNumber::new(1), 15)); // now it should indicate a pause - assert!(h.log_write( - PartitionId::new(1), - sequencer_id, - SequenceNumber::new(2), - 10 - )); + assert!(h.log_write(partition_id, sequencer_id, SequenceNumber::new(2), 10)); assert!(!h.can_resume_ingest()); - m.remove(PartitionId::new(1)); - assert!(h.can_resume_ingest()); - assert!(!h.log_write(PartitionId::new(1), sequencer_id, SequenceNumber::new(3), 3)); + // Trigger a persistence job that succeeds. + let persister = Arc::new(TestPersister::default()); + m.maybe_persist(&persister).await; + + // Wait for the persist op to complete + async { + loop { + if persister.persist_called_for(partition_id) { + // Persist has completed, but the memory may be released next, + // so loop looking for the ingest resumption signal. + if h.can_resume_ingest() { + // This is the expected case. + // + // Persistence has completed and the memory accounting has + // been adjusted to reflect the freed partition data. + return; + } + } + + tokio::time::sleep(Duration::from_millis(50)).await; + } + } + .with_timeout_panic(Duration::from_secs(5)) + .await; + + // Now the limit has dropped back down, writes should not return + // pause=true + assert!(!h.log_write(partition_id, sequencer_id, SequenceNumber::new(3), 3)); } #[tokio::test]