diff --git a/server/src/db.rs b/server/src/db.rs index 32d5ef9a78..91ac425b64 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -897,16 +897,27 @@ impl Db { let sequenced_entry = Arc::new(sequenced_entry); // store entry - match self.store_sequenced_entry(Arc::clone(&sequenced_entry)) { - Ok(_) => { - red_observation.ok(); - } - Err(e) => { - debug!( - ?e, - "Error storing SequencedEntry from write buffer in database" - ); - red_observation.error(); + loop { + match self.store_sequenced_entry(Arc::clone(&sequenced_entry)) { + Ok(_) => { + red_observation.ok(); + break; + } + Err(Error::HardLimitReached {}) => { + // wait a bit and retry + tokio::time::sleep(Duration::from_millis(100)).await; + continue; + } + Err(e) => { + debug!( + ?e, + "Error storing SequencedEntry from write buffer in database" + ); + red_observation.error(); + + // no retry + break; + } } } diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs index 07f97ced64..9237afe0fc 100644 --- a/server/src/db/replay.rs +++ b/server/src/db/replay.rs @@ -1,6 +1,7 @@ use std::{ collections::{BTreeMap, BTreeSet}, sync::Arc, + time::Duration, }; use futures::TryStreamExt; @@ -187,9 +188,24 @@ pub async fn perform_replay(db: &Db, replay_plan: &ReplayPlan) -> Result<()> { } let entry = Arc::new(entry); - db.store_sequenced_entry(entry) - .map_err(Box::new) - .context(StoreError { sequencer_id })?; + let n_tries = 100; + for n_try in 1..=n_tries { + match db.store_sequenced_entry(Arc::clone(&entry)) { + Ok(_) => { + break; + } + Err(crate::db::Error::HardLimitReached {}) if n_try < n_tries => { + tokio::time::sleep(Duration::from_millis(100)).await; + continue; + } + Err(e) => { + return Err(Error::StoreError { + sequencer_id, + source: Box::new(e), + }); + } + } + } // done replaying? if sequence.number == min_max.max() { @@ -209,7 +225,7 @@ mod tests { use std::{ convert::TryFrom, - num::NonZeroU32, + num::{NonZeroU32, NonZeroUsize}, sync::Arc, time::{Duration, Instant}, }; @@ -477,6 +493,7 @@ mod tests { tokio::sync::Mutex::new(Box::new(write_buffer) as _), ))) .lifecycle_rules(data_types::database_rules::LifecycleRules { + buffer_size_hard: Some(NonZeroUsize::new(10_000).unwrap()), late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), ..Default::default() }) @@ -1234,6 +1251,52 @@ mod tests { .await; } + #[tokio::test] + async fn replay_compacts() { + // these numbers are handtuned to trigger hard buffer limits w/o making the test too big, it still takes ~10s + // :( + let n_entries = 400u64; + let sequenced_entries: Vec<_> = (0..n_entries) + .map(|sequence_number| { + let lp = format!( + "table_1,tag_partition_by=a foo=\"hello\",bar=10 {}", + sequence_number + ); + let lp: &'static str = Box::leak(Box::new(lp)); + TestSequencedEntry { + sequencer_id: 0, + sequence_number, + lp, + } + }) + .collect(); + + ReplayTest { + n_sequencers: 1, + steps: vec![ + Step::Ingest(sequenced_entries), + Step::Ingest(vec![TestSequencedEntry { + sequencer_id: 0, + sequence_number: n_entries, + lp: "table_2,tag_partition_by=a bar=11 10", + }]), + Step::Await(vec![Check::Partitions(vec![ + ("table_1", "tag_partition_by_a"), + ("table_2", "tag_partition_by_a"), + ])]), + Step::Persist(vec![("table_2", "tag_partition_by_a")]), + Step::Restart, + Step::Assert(vec![Check::Partitions(vec![( + "table_2", + "tag_partition_by_a", + )])]), + Step::Replay, + ], + } + .run() + .await; + } + #[tokio::test] async fn replay_fail_sequencers_change() { // create write buffer w/ sequencer 0 and 1