fix: hard buffer limits around write buffer consumption
- when reading entries from write buffer during normal playback, do not throw away entries when hitting the hard buffer limit. instead wait for compaction to sort it out - during playback, wait for compactionpull/24376/head
parent
9ea04a42ff
commit
3ac88ffc49
|
@ -897,16 +897,27 @@ impl Db {
|
||||||
let sequenced_entry = Arc::new(sequenced_entry);
|
let sequenced_entry = Arc::new(sequenced_entry);
|
||||||
|
|
||||||
// store entry
|
// store entry
|
||||||
match self.store_sequenced_entry(Arc::clone(&sequenced_entry)) {
|
loop {
|
||||||
Ok(_) => {
|
match self.store_sequenced_entry(Arc::clone(&sequenced_entry)) {
|
||||||
red_observation.ok();
|
Ok(_) => {
|
||||||
}
|
red_observation.ok();
|
||||||
Err(e) => {
|
break;
|
||||||
debug!(
|
}
|
||||||
?e,
|
Err(Error::HardLimitReached {}) => {
|
||||||
"Error storing SequencedEntry from write buffer in database"
|
// wait a bit and retry
|
||||||
);
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
red_observation.error();
|
continue;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
debug!(
|
||||||
|
?e,
|
||||||
|
"Error storing SequencedEntry from write buffer in database"
|
||||||
|
);
|
||||||
|
red_observation.error();
|
||||||
|
|
||||||
|
// no retry
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::{BTreeMap, BTreeSet},
|
collections::{BTreeMap, BTreeSet},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::TryStreamExt;
|
use futures::TryStreamExt;
|
||||||
|
@ -187,9 +188,24 @@ pub async fn perform_replay(db: &Db, replay_plan: &ReplayPlan) -> Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
let entry = Arc::new(entry);
|
let entry = Arc::new(entry);
|
||||||
db.store_sequenced_entry(entry)
|
let n_tries = 100;
|
||||||
.map_err(Box::new)
|
for n_try in 1..=n_tries {
|
||||||
.context(StoreError { sequencer_id })?;
|
match db.store_sequenced_entry(Arc::clone(&entry)) {
|
||||||
|
Ok(_) => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(crate::db::Error::HardLimitReached {}) if n_try < n_tries => {
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
return Err(Error::StoreError {
|
||||||
|
sequencer_id,
|
||||||
|
source: Box::new(e),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// done replaying?
|
// done replaying?
|
||||||
if sequence.number == min_max.max() {
|
if sequence.number == min_max.max() {
|
||||||
|
@ -209,7 +225,7 @@ mod tests {
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
convert::TryFrom,
|
convert::TryFrom,
|
||||||
num::NonZeroU32,
|
num::{NonZeroU32, NonZeroUsize},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
@ -477,6 +493,7 @@ mod tests {
|
||||||
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
|
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
|
||||||
)))
|
)))
|
||||||
.lifecycle_rules(data_types::database_rules::LifecycleRules {
|
.lifecycle_rules(data_types::database_rules::LifecycleRules {
|
||||||
|
buffer_size_hard: Some(NonZeroUsize::new(10_000).unwrap()),
|
||||||
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
|
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
})
|
})
|
||||||
|
@ -1234,6 +1251,52 @@ mod tests {
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn replay_compacts() {
|
||||||
|
// these numbers are handtuned to trigger hard buffer limits w/o making the test too big, it still takes ~10s
|
||||||
|
// :(
|
||||||
|
let n_entries = 400u64;
|
||||||
|
let sequenced_entries: Vec<_> = (0..n_entries)
|
||||||
|
.map(|sequence_number| {
|
||||||
|
let lp = format!(
|
||||||
|
"table_1,tag_partition_by=a foo=\"hello\",bar=10 {}",
|
||||||
|
sequence_number
|
||||||
|
);
|
||||||
|
let lp: &'static str = Box::leak(Box::new(lp));
|
||||||
|
TestSequencedEntry {
|
||||||
|
sequencer_id: 0,
|
||||||
|
sequence_number,
|
||||||
|
lp,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
ReplayTest {
|
||||||
|
n_sequencers: 1,
|
||||||
|
steps: vec![
|
||||||
|
Step::Ingest(sequenced_entries),
|
||||||
|
Step::Ingest(vec![TestSequencedEntry {
|
||||||
|
sequencer_id: 0,
|
||||||
|
sequence_number: n_entries,
|
||||||
|
lp: "table_2,tag_partition_by=a bar=11 10",
|
||||||
|
}]),
|
||||||
|
Step::Await(vec![Check::Partitions(vec![
|
||||||
|
("table_1", "tag_partition_by_a"),
|
||||||
|
("table_2", "tag_partition_by_a"),
|
||||||
|
])]),
|
||||||
|
Step::Persist(vec![("table_2", "tag_partition_by_a")]),
|
||||||
|
Step::Restart,
|
||||||
|
Step::Assert(vec![Check::Partitions(vec![(
|
||||||
|
"table_2",
|
||||||
|
"tag_partition_by_a",
|
||||||
|
)])]),
|
||||||
|
Step::Replay,
|
||||||
|
],
|
||||||
|
}
|
||||||
|
.run()
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn replay_fail_sequencers_change() {
|
async fn replay_fail_sequencers_change() {
|
||||||
// create write buffer w/ sequencer 0 and 1
|
// create write buffer w/ sequencer 0 and 1
|
||||||
|
|
Loading…
Reference in New Issue