Merge pull request #4941 from influxdata/dom/persist-memory-accounting
fix: account for partition memory until persist is completedpull/24376/head
commit
ce906354f9
ingester/src
|
@ -180,10 +180,7 @@ struct LifecycleState {
|
||||||
|
|
||||||
impl LifecycleState {
|
impl LifecycleState {
|
||||||
fn remove(&mut self, partition_id: &PartitionId) -> Option<PartitionLifecycleStats> {
|
fn remove(&mut self, partition_id: &PartitionId) -> Option<PartitionLifecycleStats> {
|
||||||
self.partition_stats.remove(partition_id).map(|stats| {
|
self.partition_stats.remove(partition_id)
|
||||||
self.total_bytes -= stats.bytes_written;
|
|
||||||
stats
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -352,14 +349,25 @@ impl LifecycleManager {
|
||||||
let persist_tasks: Vec<_> = to_persist
|
let persist_tasks: Vec<_> = to_persist
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|s| {
|
.map(|s| {
|
||||||
self.remove(s.partition_id);
|
// Mark this partition as being persisted, and remember the
|
||||||
|
// memory allocation it had accumulated.
|
||||||
|
let partition_memory_usage = self
|
||||||
|
.remove(s.partition_id)
|
||||||
|
.map(|s| s.bytes_written)
|
||||||
|
.unwrap_or_default();
|
||||||
let persister = Arc::clone(persister);
|
let persister = Arc::clone(persister);
|
||||||
|
|
||||||
let (_tracker, registration) = self.job_registry.register(Job::Persist {
|
let (_tracker, registration) = self.job_registry.register(Job::Persist {
|
||||||
partition_id: s.partition_id,
|
partition_id: s.partition_id,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let state = Arc::clone(&self.state);
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
persister.persist(s.partition_id).await;
|
persister.persist(s.partition_id).await;
|
||||||
|
// Now the data has been uploaded and the memory it was
|
||||||
|
// using has been freed, released the memory capacity back
|
||||||
|
// the ingester.
|
||||||
|
state.lock().total_bytes -= partition_memory_usage;
|
||||||
})
|
})
|
||||||
.track(registration)
|
.track(registration)
|
||||||
})
|
})
|
||||||
|
@ -459,6 +467,7 @@ mod tests {
|
||||||
use iox_time::MockProvider;
|
use iox_time::MockProvider;
|
||||||
use metric::{Attributes, Registry};
|
use metric::{Attributes, Registry};
|
||||||
use std::collections::BTreeSet;
|
use std::collections::BTreeSet;
|
||||||
|
use tokio::sync::Barrier;
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
struct TestPersister {
|
struct TestPersister {
|
||||||
|
@ -495,6 +504,91 @@ mod tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
/// Synchronizes waiting on some test event
|
||||||
|
struct EventBarrier {
|
||||||
|
before: Arc<Barrier>,
|
||||||
|
after: Arc<Barrier>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventBarrier {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
before: Arc::new(Barrier::new(2)),
|
||||||
|
after: Arc::new(Barrier::new(2)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This persister will pause after persist is called
|
||||||
|
struct PausablePersister {
|
||||||
|
inner: TestPersister,
|
||||||
|
events: Mutex<BTreeMap<PartitionId, EventBarrier>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PausablePersister {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
inner: TestPersister::default(),
|
||||||
|
events: Mutex::new(BTreeMap::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Persister for PausablePersister {
|
||||||
|
async fn persist(&self, partition_id: PartitionId) {
|
||||||
|
self.inner.persist(partition_id).await;
|
||||||
|
if let Some(event) = self.event(partition_id) {
|
||||||
|
event.before.wait().await;
|
||||||
|
event.after.wait().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_min_unpersisted_sequence_number(
|
||||||
|
&self,
|
||||||
|
sequencer_id: SequencerId,
|
||||||
|
sequence_number: SequenceNumber,
|
||||||
|
) {
|
||||||
|
self.inner
|
||||||
|
.update_min_unpersisted_sequence_number(sequencer_id, sequence_number)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PausablePersister {
|
||||||
|
/// Wait until the persist operation has started
|
||||||
|
async fn wait_for_persist(&self, partition_id: PartitionId) {
|
||||||
|
let event = self
|
||||||
|
.event(partition_id)
|
||||||
|
.expect("partition not configured to wait");
|
||||||
|
event.before.wait().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Allow the persist operation to complete
|
||||||
|
async fn complete_persist(&self, partition_id: PartitionId) {
|
||||||
|
let event = self
|
||||||
|
.take_event(partition_id)
|
||||||
|
.expect("partition not configured to wait");
|
||||||
|
event.after.wait().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// reset so a persist operation can begin again
|
||||||
|
fn pause_next(&self, partition_id: PartitionId) {
|
||||||
|
self.events.lock().insert(partition_id, EventBarrier::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// get the event barrier configured for the specified partition
|
||||||
|
fn event(&self, partition_id: PartitionId) -> Option<EventBarrier> {
|
||||||
|
self.events.lock().get(&partition_id).cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// get the event barrier configured for the specified partition removing it
|
||||||
|
fn take_event(&self, partition_id: PartitionId) -> Option<EventBarrier> {
|
||||||
|
self.events.lock().remove(&partition_id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn logs_write() {
|
fn logs_write() {
|
||||||
let config = LifecycleConfig {
|
let config = LifecycleConfig {
|
||||||
|
@ -537,8 +631,8 @@ mod tests {
|
||||||
assert_eq!(p2.first_write, Time::from_timestamp_nanos(10));
|
assert_eq!(p2.first_write, Time::from_timestamp_nanos(10));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[tokio::test]
|
||||||
fn pausing_and_resuming_ingest() {
|
async fn pausing_and_resuming_ingest() {
|
||||||
let config = LifecycleConfig {
|
let config = LifecycleConfig {
|
||||||
pause_ingest_size: 20,
|
pause_ingest_size: 20,
|
||||||
persist_memory_threshold: 10,
|
persist_memory_threshold: 10,
|
||||||
|
@ -546,29 +640,71 @@ mod tests {
|
||||||
partition_age_threshold: Duration::from_nanos(0),
|
partition_age_threshold: Duration::from_nanos(0),
|
||||||
partition_cold_threshold: Duration::from_secs(500),
|
partition_cold_threshold: Duration::from_secs(500),
|
||||||
};
|
};
|
||||||
let TestLifecycleManger { m, .. } = TestLifecycleManger::new(config);
|
let partition_id = PartitionId::new(1);
|
||||||
|
let TestLifecycleManger { mut m, .. } = TestLifecycleManger::new(config);
|
||||||
let sequencer_id = SequencerId::new(1);
|
let sequencer_id = SequencerId::new(1);
|
||||||
let h = m.handle();
|
let h = m.handle();
|
||||||
|
|
||||||
assert!(!h.log_write(
|
// write more than the limit (10)
|
||||||
PartitionId::new(1),
|
assert!(!h.log_write(partition_id, sequencer_id, SequenceNumber::new(1), 15));
|
||||||
sequencer_id,
|
|
||||||
SequenceNumber::new(1),
|
|
||||||
15
|
|
||||||
));
|
|
||||||
|
|
||||||
// now it should indicate a pause
|
// all subsequent writes should also indicate a pause
|
||||||
assert!(h.log_write(
|
assert!(h.log_write(partition_id, sequencer_id, SequenceNumber::new(2), 10));
|
||||||
PartitionId::new(1),
|
|
||||||
sequencer_id,
|
|
||||||
SequenceNumber::new(2),
|
|
||||||
10
|
|
||||||
));
|
|
||||||
assert!(!h.can_resume_ingest());
|
assert!(!h.can_resume_ingest());
|
||||||
|
|
||||||
m.remove(PartitionId::new(1));
|
// persist the partition
|
||||||
|
let persister = Arc::new(TestPersister::default());
|
||||||
|
m.maybe_persist(&persister).await;
|
||||||
|
|
||||||
|
// ingest can resume
|
||||||
assert!(h.can_resume_ingest());
|
assert!(h.can_resume_ingest());
|
||||||
assert!(!h.log_write(PartitionId::new(1), sequencer_id, SequenceNumber::new(3), 3));
|
assert!(!h.log_write(partition_id, sequencer_id, SequenceNumber::new(3), 3));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn pausing_ingest_waits_until_persist_completes() {
|
||||||
|
let config = LifecycleConfig {
|
||||||
|
pause_ingest_size: 20,
|
||||||
|
persist_memory_threshold: 10,
|
||||||
|
partition_size_threshold: 5,
|
||||||
|
partition_age_threshold: Duration::from_nanos(0),
|
||||||
|
partition_cold_threshold: Duration::from_secs(500),
|
||||||
|
};
|
||||||
|
let partition_id = PartitionId::new(1);
|
||||||
|
let TestLifecycleManger { mut m, .. } = TestLifecycleManger::new(config);
|
||||||
|
let sequencer_id = SequencerId::new(1);
|
||||||
|
let h = m.handle();
|
||||||
|
|
||||||
|
// write more than the limit (20)
|
||||||
|
h.log_write(partition_id, sequencer_id, SequenceNumber::new(1), 25);
|
||||||
|
|
||||||
|
// can not resume ingest as we are overall the pause ingest limit
|
||||||
|
assert!(!h.can_resume_ingest());
|
||||||
|
|
||||||
|
// persist the partition, pausing once it starts
|
||||||
|
let persister = Arc::new(PausablePersister::new());
|
||||||
|
persister.pause_next(partition_id);
|
||||||
|
|
||||||
|
let captured_persister = Arc::clone(&persister);
|
||||||
|
let persist = tokio::task::spawn(async move {
|
||||||
|
m.maybe_persist(&captured_persister).await;
|
||||||
|
m
|
||||||
|
});
|
||||||
|
|
||||||
|
// wait for persist to have started
|
||||||
|
persister.wait_for_persist(partition_id).await;
|
||||||
|
|
||||||
|
// until persist completes, ingest can not proceed (as the
|
||||||
|
// ingester is still over the limit).
|
||||||
|
assert!(!h.can_resume_ingest());
|
||||||
|
|
||||||
|
// allow persist to complete
|
||||||
|
persister.complete_persist(partition_id).await;
|
||||||
|
persist.await.expect("task panic'd");
|
||||||
|
|
||||||
|
// ingest can resume
|
||||||
|
assert!(h.can_resume_ingest());
|
||||||
|
assert!(!h.log_write(partition_id, sequencer_id, SequenceNumber::new(2), 3));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|
Loading…
Reference in New Issue