From 9a79d165856de584258dcea21912780ee220dd02 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 23 Jun 2022 15:40:51 +0100 Subject: [PATCH 1/3] fix: account for partition memory until persisted The ingester maintains a rough "total memory in use" counter it uses to try and limit the amount of memory the ingester is using overall. When a partition is persisted, this total memory usage value is adjusted to account for releasing the partition memory. Prior to this commit, the ordering was: * Writes increase the memory counter * maybe_persist() is called to trigger persistence * A partition is identified for persistence * Partition memory usage is released back to the total memory counter * Persistence starts This meant that the partitions in the process of being persisted were not accounted for in the ingester's total memory counter, and therefore we could significantly overrun the configured memory limit. After this commit, the ordering is: * Writes increase the memory counter * maybe_persist() is called to trigger persistence * A partition is identified for persistence * Persistence starts * Persistence completes * Partition memory usage is released back to the total memory counter This ensures persisting partitions are sill tracked in the total memory counter, causing pauses to correctly fire. --- ingester/src/lifecycle.rs | 72 ++++++++++++++++++++++++++------------- 1 file changed, 49 insertions(+), 23 deletions(-) diff --git a/ingester/src/lifecycle.rs b/ingester/src/lifecycle.rs index 119d1c2f82..18f5ead6d9 100644 --- a/ingester/src/lifecycle.rs +++ b/ingester/src/lifecycle.rs @@ -180,10 +180,7 @@ struct LifecycleState { impl LifecycleState { fn remove(&mut self, partition_id: &PartitionId) -> Option { - self.partition_stats.remove(partition_id).map(|stats| { - self.total_bytes -= stats.bytes_written; - stats - }) + self.partition_stats.remove(partition_id) } } @@ -352,14 +349,25 @@ impl LifecycleManager { let persist_tasks: Vec<_> = to_persist .into_iter() .map(|s| { - self.remove(s.partition_id); + // Mark this partition as being persisted, and remember the + // memory allocation it had accumulated. + let partition_memory_usage = self + .remove(s.partition_id) + .map(|s| s.bytes_written) + .unwrap_or_default(); let persister = Arc::clone(persister); let (_tracker, registration) = self.job_registry.register(Job::Persist { partition_id: s.partition_id, }); + + let state = Arc::clone(&self.state); tokio::task::spawn(async move { persister.persist(s.partition_id).await; + // Now the data has been uploaded and the memory it was + // using has been freed, released the memory capacity back + // the ingester. + state.lock().total_bytes -= partition_memory_usage; }) .track(registration) }) @@ -459,6 +467,7 @@ mod tests { use iox_time::MockProvider; use metric::{Attributes, Registry}; use std::collections::BTreeSet; + use test_helpers::timeout::FutureTimeout; #[derive(Default)] struct TestPersister { @@ -537,8 +546,8 @@ mod tests { assert_eq!(p2.first_write, Time::from_timestamp_nanos(10)); } - #[test] - fn pausing_and_resuming_ingest() { + #[tokio::test] + async fn pausing_and_resuming_ingest() { let config = LifecycleConfig { pause_ingest_size: 20, persist_memory_threshold: 10, @@ -546,29 +555,46 @@ mod tests { partition_age_threshold: Duration::from_nanos(0), partition_cold_threshold: Duration::from_secs(500), }; - let TestLifecycleManger { m, .. } = TestLifecycleManger::new(config); + let TestLifecycleManger { mut m, .. } = TestLifecycleManger::new(config); let sequencer_id = SequencerId::new(1); let h = m.handle(); - assert!(!h.log_write( - PartitionId::new(1), - sequencer_id, - SequenceNumber::new(1), - 15 - )); + let partition_id = PartitionId::new(1); + + assert!(!h.log_write(partition_id, sequencer_id, SequenceNumber::new(1), 15)); // now it should indicate a pause - assert!(h.log_write( - PartitionId::new(1), - sequencer_id, - SequenceNumber::new(2), - 10 - )); + assert!(h.log_write(partition_id, sequencer_id, SequenceNumber::new(2), 10)); assert!(!h.can_resume_ingest()); - m.remove(PartitionId::new(1)); - assert!(h.can_resume_ingest()); - assert!(!h.log_write(PartitionId::new(1), sequencer_id, SequenceNumber::new(3), 3)); + // Trigger a persistence job that succeeds. + let persister = Arc::new(TestPersister::default()); + m.maybe_persist(&persister).await; + + // Wait for the persist op to complete + async { + loop { + if persister.persist_called_for(partition_id) { + // Persist has completed, but the memory may be released next, + // so loop looking for the ingest resumption signal. + if h.can_resume_ingest() { + // This is the expected case. + // + // Persistence has completed and the memory accounting has + // been adjusted to reflect the freed partition data. + return; + } + } + + tokio::time::sleep(Duration::from_millis(50)).await; + } + } + .with_timeout_panic(Duration::from_secs(5)) + .await; + + // Now the limit has dropped back down, writes should not return + // pause=true + assert!(!h.log_write(partition_id, sequencer_id, SequenceNumber::new(3), 3)); } #[tokio::test] From fb4c3ed2944c7a67a11d4c47f8fd1d2d1948e96c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 23 Jun 2022 11:34:59 -0400 Subject: [PATCH 2/3] fix: revert test change --- ingester/src/lifecycle.rs | 54 +++++++++++++-------------------------- 1 file changed, 18 insertions(+), 36 deletions(-) diff --git a/ingester/src/lifecycle.rs b/ingester/src/lifecycle.rs index 18f5ead6d9..3bd48317e9 100644 --- a/ingester/src/lifecycle.rs +++ b/ingester/src/lifecycle.rs @@ -467,7 +467,6 @@ mod tests { use iox_time::MockProvider; use metric::{Attributes, Registry}; use std::collections::BTreeSet; - use test_helpers::timeout::FutureTimeout; #[derive(Default)] struct TestPersister { @@ -546,8 +545,8 @@ mod tests { assert_eq!(p2.first_write, Time::from_timestamp_nanos(10)); } - #[tokio::test] - async fn pausing_and_resuming_ingest() { + #[test] + fn pausing_and_resuming_ingest() { let config = LifecycleConfig { pause_ingest_size: 20, persist_memory_threshold: 10, @@ -555,46 +554,29 @@ mod tests { partition_age_threshold: Duration::from_nanos(0), partition_cold_threshold: Duration::from_secs(500), }; - let TestLifecycleManger { mut m, .. } = TestLifecycleManger::new(config); + let TestLifecycleManger { m, .. } = TestLifecycleManger::new(config); let sequencer_id = SequencerId::new(1); let h = m.handle(); - let partition_id = PartitionId::new(1); - - assert!(!h.log_write(partition_id, sequencer_id, SequenceNumber::new(1), 15)); + assert!(!h.log_write( + PartitionId::new(1), + sequencer_id, + SequenceNumber::new(1), + 15 + )); // now it should indicate a pause - assert!(h.log_write(partition_id, sequencer_id, SequenceNumber::new(2), 10)); + assert!(h.log_write( + PartitionId::new(1), + sequencer_id, + SequenceNumber::new(2), + 10 + )); assert!(!h.can_resume_ingest()); - // Trigger a persistence job that succeeds. - let persister = Arc::new(TestPersister::default()); - m.maybe_persist(&persister).await; - - // Wait for the persist op to complete - async { - loop { - if persister.persist_called_for(partition_id) { - // Persist has completed, but the memory may be released next, - // so loop looking for the ingest resumption signal. - if h.can_resume_ingest() { - // This is the expected case. - // - // Persistence has completed and the memory accounting has - // been adjusted to reflect the freed partition data. - return; - } - } - - tokio::time::sleep(Duration::from_millis(50)).await; - } - } - .with_timeout_panic(Duration::from_secs(5)) - .await; - - // Now the limit has dropped back down, writes should not return - // pause=true - assert!(!h.log_write(partition_id, sequencer_id, SequenceNumber::new(3), 3)); + m.remove(PartitionId::new(1)); + assert!(h.can_resume_ingest()); + assert!(!h.log_write(PartitionId::new(1), sequencer_id, SequenceNumber::new(3), 3)); } #[tokio::test] From 49b34e11356fec54ee51d262680d5949884ef1a7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 23 Jun 2022 11:50:55 -0400 Subject: [PATCH 3/3] test: add appropriate tests --- ingester/src/lifecycle.rs | 164 +++++++++++++++++++++++++++++++++----- 1 file changed, 146 insertions(+), 18 deletions(-) diff --git a/ingester/src/lifecycle.rs b/ingester/src/lifecycle.rs index 3bd48317e9..e3560d7ce5 100644 --- a/ingester/src/lifecycle.rs +++ b/ingester/src/lifecycle.rs @@ -467,6 +467,7 @@ mod tests { use iox_time::MockProvider; use metric::{Attributes, Registry}; use std::collections::BTreeSet; + use tokio::sync::Barrier; #[derive(Default)] struct TestPersister { @@ -503,6 +504,91 @@ mod tests { } } + #[derive(Debug, Clone)] + /// Synchronizes waiting on some test event + struct EventBarrier { + before: Arc, + after: Arc, + } + + impl EventBarrier { + fn new() -> Self { + Self { + before: Arc::new(Barrier::new(2)), + after: Arc::new(Barrier::new(2)), + } + } + } + + /// This persister will pause after persist is called + struct PausablePersister { + inner: TestPersister, + events: Mutex>, + } + + impl PausablePersister { + fn new() -> Self { + Self { + inner: TestPersister::default(), + events: Mutex::new(BTreeMap::new()), + } + } + } + + #[async_trait] + impl Persister for PausablePersister { + async fn persist(&self, partition_id: PartitionId) { + self.inner.persist(partition_id).await; + if let Some(event) = self.event(partition_id) { + event.before.wait().await; + event.after.wait().await; + } + } + + async fn update_min_unpersisted_sequence_number( + &self, + sequencer_id: SequencerId, + sequence_number: SequenceNumber, + ) { + self.inner + .update_min_unpersisted_sequence_number(sequencer_id, sequence_number) + .await + } + } + + impl PausablePersister { + /// Wait until the persist operation has started + async fn wait_for_persist(&self, partition_id: PartitionId) { + let event = self + .event(partition_id) + .expect("partition not configured to wait"); + event.before.wait().await; + } + + /// Allow the persist operation to complete + async fn complete_persist(&self, partition_id: PartitionId) { + let event = self + .take_event(partition_id) + .expect("partition not configured to wait"); + event.after.wait().await; + } + + /// reset so a persist operation can begin again + fn pause_next(&self, partition_id: PartitionId) { + self.events.lock().insert(partition_id, EventBarrier::new()); + } + + /// get the event barrier configured for the specified partition + fn event(&self, partition_id: PartitionId) -> Option { + self.events.lock().get(&partition_id).cloned() + } + + /// get the event barrier configured for the specified partition removing it + fn take_event(&self, partition_id: PartitionId) -> Option { + self.events.lock().remove(&partition_id) + } + } + #[test] fn logs_write() { let config = LifecycleConfig { @@ -545,8 +631,8 @@ mod tests { assert_eq!(p2.first_write, Time::from_timestamp_nanos(10)); } - #[test] - fn pausing_and_resuming_ingest() { + #[tokio::test] + async fn pausing_and_resuming_ingest() { let config = LifecycleConfig { pause_ingest_size: 20, persist_memory_threshold: 10, @@ -554,29 +640,71 @@ mod tests { partition_age_threshold: Duration::from_nanos(0), partition_cold_threshold: Duration::from_secs(500), }; - let TestLifecycleManger { m, .. } = TestLifecycleManger::new(config); + let partition_id = PartitionId::new(1); + let TestLifecycleManger { mut m, .. } = TestLifecycleManger::new(config); let sequencer_id = SequencerId::new(1); let h = m.handle(); - assert!(!h.log_write( - PartitionId::new(1), - sequencer_id, - SequenceNumber::new(1), - 15 - )); + // write more than the limit (10) + assert!(!h.log_write(partition_id, sequencer_id, SequenceNumber::new(1), 15)); - // now it should indicate a pause - assert!(h.log_write( - PartitionId::new(1), - sequencer_id, - SequenceNumber::new(2), - 10 - )); + // all subsequent writes should also indicate a pause + assert!(h.log_write(partition_id, sequencer_id, SequenceNumber::new(2), 10)); assert!(!h.can_resume_ingest()); - m.remove(PartitionId::new(1)); + // persist the partition + let persister = Arc::new(TestPersister::default()); + m.maybe_persist(&persister).await; + + // ingest can resume assert!(h.can_resume_ingest()); - assert!(!h.log_write(PartitionId::new(1), sequencer_id, SequenceNumber::new(3), 3)); + assert!(!h.log_write(partition_id, sequencer_id, SequenceNumber::new(3), 3)); + } + + #[tokio::test] + async fn pausing_ingest_waits_until_persist_completes() { + let config = LifecycleConfig { + pause_ingest_size: 20, + persist_memory_threshold: 10, + partition_size_threshold: 5, + partition_age_threshold: Duration::from_nanos(0), + partition_cold_threshold: Duration::from_secs(500), + }; + let partition_id = PartitionId::new(1); + let TestLifecycleManger { mut m, .. } = TestLifecycleManger::new(config); + let sequencer_id = SequencerId::new(1); + let h = m.handle(); + + // write more than the limit (20) + h.log_write(partition_id, sequencer_id, SequenceNumber::new(1), 25); + + // can not resume ingest as we are overall the pause ingest limit + assert!(!h.can_resume_ingest()); + + // persist the partition, pausing once it starts + let persister = Arc::new(PausablePersister::new()); + persister.pause_next(partition_id); + + let captured_persister = Arc::clone(&persister); + let persist = tokio::task::spawn(async move { + m.maybe_persist(&captured_persister).await; + m + }); + + // wait for persist to have started + persister.wait_for_persist(partition_id).await; + + // until persist completes, ingest can not proceed (as the + // ingester is still over the limit). + assert!(!h.can_resume_ingest()); + + // allow persist to complete + persister.complete_persist(partition_id).await; + persist.await.expect("task panic'd"); + + // ingest can resume + assert!(h.can_resume_ingest()); + assert!(!h.log_write(partition_id, sequencer_id, SequenceNumber::new(2), 3)); } #[tokio::test]