From 164c6e374342a116b75aa969cd5b07abba5c0ea3 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 4 Aug 2021 11:49:05 +0200 Subject: [PATCH] feat: improve hard buffer logging and use that as test assertions --- server/src/db.rs | 38 +++++++++++++++++++++++++++++++++----- server/src/db/replay.rs | 21 ++++++++++++++++++++- 2 files changed, 53 insertions(+), 6 deletions(-) diff --git a/server/src/db.rs b/server/src/db.rs index 088c741e9e..acdfaf89bc 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -853,6 +853,7 @@ impl Db { for (sequencer_id, stream) in write_buffer.streams() { let metrics = self.ingest_metrics.new_sequencer_metrics(sequencer_id); let fut = self.stream_in_sequenced_entries( + sequencer_id, stream.stream, stream.fetch_high_watermark, metrics, @@ -875,10 +876,12 @@ impl Db { /// streaming entries from a write buffer. async fn stream_in_sequenced_entries<'a>( &'a self, + sequencer_id: u32, mut stream: BoxStream<'a, Result>, f_mark: FetchHighWatermark<'a>, mut metrics: SequencerMetrics, ) { + let db_name = self.rules.read().db_name().to_string(); let mut watermark_last_updated: Option = None; let mut watermark = 0; @@ -889,7 +892,12 @@ impl Db { let sequenced_entry = match sequenced_entry_result { Ok(sequenced_entry) => sequenced_entry, Err(e) => { - debug!(?e, "Error converting write buffer data to SequencedEntry"); + debug!( + %e, + %db_name, + sequencer_id, + "Error converting write buffer data to SequencedEntry", + ); red_observation.client_error(); continue; } @@ -897,6 +905,7 @@ impl Db { let sequenced_entry = Arc::new(sequenced_entry); // store entry + let mut logged_hard_limit = false; loop { match self.store_sequenced_entry(Arc::clone(&sequenced_entry)) { Ok(_) => { @@ -905,12 +914,22 @@ impl Db { } Err(Error::HardLimitReached {}) => { // wait a bit and retry + if !logged_hard_limit { + info!( + %db_name, + sequencer_id, + "Hard limit reached while reading from write buffer, waiting for compaction to catch up", + ); + logged_hard_limit = true; + } tokio::time::sleep(Duration::from_millis(100)).await; continue; } Err(e) => { debug!( - ?e, + %e, + %db_name, + sequencer_id, "Error storing SequencedEntry from write buffer in database" ); red_observation.error(); @@ -933,7 +952,12 @@ impl Db { watermark = w; } Err(e) => { - debug!(%e, "Error while reading sequencer watermark") + debug!( + %e, + %db_name, + sequencer_id, + "Error while reading sequencer watermark", + ) } } watermark_last_updated = Some(Instant::now()); @@ -1408,7 +1432,7 @@ mod tests { }, utils::{make_db, TestDb}, }; - use ::test_helpers::assert_contains; + use ::test_helpers::{assert_contains, tracing::TracingCapture}; use arrow::record_batch::RecordBatch; use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; use bytes::Bytes; @@ -1695,9 +1719,10 @@ mod tests { #[tokio::test] async fn write_buffer_reads_wait_for_compaction() { - // these numbers are handtuned to trigger hard buffer limits w/o making the test too big + let tracing_capture = TracingCapture::new(); // setup write buffer + // these numbers are handtuned to trigger hard buffer limits w/o making the test too big let n_entries = 50u64; let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1); for sequence_number in 0..n_entries { @@ -1763,6 +1788,9 @@ mod tests { let batches = run_query(db, "select sum(bar) as n from table_1").await; let expected = vec!["+----+", "| n |", "+----+", "| 25 |", "+----+"]; assert_batches_eq!(expected, &batches); + + // check that hard buffer limit was actually hit (otherwise this test is pointless/outdated) + assert_contains!(tracing_capture.to_string(), "Hard limit reached while reading from write buffer, waiting for compaction to catch up"); } #[tokio::test] diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs index 21efcaff1f..db0a936456 100644 --- a/server/src/db/replay.rs +++ b/server/src/db/replay.rs @@ -188,6 +188,7 @@ pub async fn perform_replay(db: &Db, replay_plan: &ReplayPlan) -> Result<()> { } let entry = Arc::new(entry); + let mut logged_hard_limit = false; let n_tries = 100; for n_try in 1..=n_tries { match db.store_sequenced_entry(Arc::clone(&entry)) { @@ -195,6 +196,16 @@ pub async fn perform_replay(db: &Db, replay_plan: &ReplayPlan) -> Result<()> { break; } Err(crate::db::Error::HardLimitReached {}) if n_try < n_tries => { + if !logged_hard_limit { + info!( + %db_name, + sequencer_id, + n_try, + n_tries, + "Hard limit reached while replaying, waiting for compaction to catch up", + ); + logged_hard_limit = true; + } tokio::time::sleep(Duration::from_millis(100)).await; continue; } @@ -246,7 +257,7 @@ mod tests { min_max_sequence::OptionalMinMaxSequence, }; use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryChunk}; - use test_helpers::assert_contains; + use test_helpers::{assert_contains, tracing::TracingCapture}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use write_buffer::{ @@ -1254,6 +1265,8 @@ mod tests { #[tokio::test] async fn replay_compacts() { + let tracing_capture = TracingCapture::new(); + // these numbers are handtuned to trigger hard buffer limits w/o making the test too big let n_entries = 50u64; let sequenced_entries: Vec<_> = (0..n_entries) @@ -1295,6 +1308,12 @@ mod tests { } .run() .await; + + // check that hard buffer limit was actually hit (otherwise this test is pointless/outdated) + assert_contains!( + tracing_capture.to_string(), + "Hard limit reached while replaying, waiting for compaction to catch up" + ); } #[tokio::test]