feat: improve hard buffer logging and use that as test assertions

pull/24376/head
Marco Neumann 2021-08-04 11:49:05 +02:00
parent 657f469317
commit 164c6e3743
2 changed files with 53 additions and 6 deletions

View File

@ -853,6 +853,7 @@ impl Db {
for (sequencer_id, stream) in write_buffer.streams() { for (sequencer_id, stream) in write_buffer.streams() {
let metrics = self.ingest_metrics.new_sequencer_metrics(sequencer_id); let metrics = self.ingest_metrics.new_sequencer_metrics(sequencer_id);
let fut = self.stream_in_sequenced_entries( let fut = self.stream_in_sequenced_entries(
sequencer_id,
stream.stream, stream.stream,
stream.fetch_high_watermark, stream.fetch_high_watermark,
metrics, metrics,
@ -875,10 +876,12 @@ impl Db {
/// streaming entries from a write buffer. /// streaming entries from a write buffer.
async fn stream_in_sequenced_entries<'a>( async fn stream_in_sequenced_entries<'a>(
&'a self, &'a self,
sequencer_id: u32,
mut stream: BoxStream<'a, Result<SequencedEntry, WriteBufferError>>, mut stream: BoxStream<'a, Result<SequencedEntry, WriteBufferError>>,
f_mark: FetchHighWatermark<'a>, f_mark: FetchHighWatermark<'a>,
mut metrics: SequencerMetrics, mut metrics: SequencerMetrics,
) { ) {
let db_name = self.rules.read().db_name().to_string();
let mut watermark_last_updated: Option<Instant> = None; let mut watermark_last_updated: Option<Instant> = None;
let mut watermark = 0; let mut watermark = 0;
@ -889,7 +892,12 @@ impl Db {
let sequenced_entry = match sequenced_entry_result { let sequenced_entry = match sequenced_entry_result {
Ok(sequenced_entry) => sequenced_entry, Ok(sequenced_entry) => sequenced_entry,
Err(e) => { Err(e) => {
debug!(?e, "Error converting write buffer data to SequencedEntry"); debug!(
%e,
%db_name,
sequencer_id,
"Error converting write buffer data to SequencedEntry",
);
red_observation.client_error(); red_observation.client_error();
continue; continue;
} }
@ -897,6 +905,7 @@ impl Db {
let sequenced_entry = Arc::new(sequenced_entry); let sequenced_entry = Arc::new(sequenced_entry);
// store entry // store entry
let mut logged_hard_limit = false;
loop { loop {
match self.store_sequenced_entry(Arc::clone(&sequenced_entry)) { match self.store_sequenced_entry(Arc::clone(&sequenced_entry)) {
Ok(_) => { Ok(_) => {
@ -905,12 +914,22 @@ impl Db {
} }
Err(Error::HardLimitReached {}) => { Err(Error::HardLimitReached {}) => {
// wait a bit and retry // wait a bit and retry
if !logged_hard_limit {
info!(
%db_name,
sequencer_id,
"Hard limit reached while reading from write buffer, waiting for compaction to catch up",
);
logged_hard_limit = true;
}
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
continue; continue;
} }
Err(e) => { Err(e) => {
debug!( debug!(
?e, %e,
%db_name,
sequencer_id,
"Error storing SequencedEntry from write buffer in database" "Error storing SequencedEntry from write buffer in database"
); );
red_observation.error(); red_observation.error();
@ -933,7 +952,12 @@ impl Db {
watermark = w; watermark = w;
} }
Err(e) => { Err(e) => {
debug!(%e, "Error while reading sequencer watermark") debug!(
%e,
%db_name,
sequencer_id,
"Error while reading sequencer watermark",
)
} }
} }
watermark_last_updated = Some(Instant::now()); watermark_last_updated = Some(Instant::now());
@ -1408,7 +1432,7 @@ mod tests {
}, },
utils::{make_db, TestDb}, utils::{make_db, TestDb},
}; };
use ::test_helpers::assert_contains; use ::test_helpers::{assert_contains, tracing::TracingCapture};
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use bytes::Bytes; use bytes::Bytes;
@ -1695,9 +1719,10 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn write_buffer_reads_wait_for_compaction() { async fn write_buffer_reads_wait_for_compaction() {
// these numbers are handtuned to trigger hard buffer limits w/o making the test too big let tracing_capture = TracingCapture::new();
// setup write buffer // setup write buffer
// these numbers are handtuned to trigger hard buffer limits w/o making the test too big
let n_entries = 50u64; let n_entries = 50u64;
let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1); let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1);
for sequence_number in 0..n_entries { for sequence_number in 0..n_entries {
@ -1763,6 +1788,9 @@ mod tests {
let batches = run_query(db, "select sum(bar) as n from table_1").await; let batches = run_query(db, "select sum(bar) as n from table_1").await;
let expected = vec!["+----+", "| n |", "+----+", "| 25 |", "+----+"]; let expected = vec!["+----+", "| n |", "+----+", "| 25 |", "+----+"];
assert_batches_eq!(expected, &batches); assert_batches_eq!(expected, &batches);
// check that hard buffer limit was actually hit (otherwise this test is pointless/outdated)
assert_contains!(tracing_capture.to_string(), "Hard limit reached while reading from write buffer, waiting for compaction to catch up");
} }
#[tokio::test] #[tokio::test]

View File

@ -188,6 +188,7 @@ pub async fn perform_replay(db: &Db, replay_plan: &ReplayPlan) -> Result<()> {
} }
let entry = Arc::new(entry); let entry = Arc::new(entry);
let mut logged_hard_limit = false;
let n_tries = 100; let n_tries = 100;
for n_try in 1..=n_tries { for n_try in 1..=n_tries {
match db.store_sequenced_entry(Arc::clone(&entry)) { match db.store_sequenced_entry(Arc::clone(&entry)) {
@ -195,6 +196,16 @@ pub async fn perform_replay(db: &Db, replay_plan: &ReplayPlan) -> Result<()> {
break; break;
} }
Err(crate::db::Error::HardLimitReached {}) if n_try < n_tries => { Err(crate::db::Error::HardLimitReached {}) if n_try < n_tries => {
if !logged_hard_limit {
info!(
%db_name,
sequencer_id,
n_try,
n_tries,
"Hard limit reached while replaying, waiting for compaction to catch up",
);
logged_hard_limit = true;
}
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
continue; continue;
} }
@ -246,7 +257,7 @@ mod tests {
min_max_sequence::OptionalMinMaxSequence, min_max_sequence::OptionalMinMaxSequence,
}; };
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryChunk}; use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryChunk};
use test_helpers::assert_contains; use test_helpers::{assert_contains, tracing::TracingCapture};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use write_buffer::{ use write_buffer::{
@ -1254,6 +1265,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn replay_compacts() { async fn replay_compacts() {
let tracing_capture = TracingCapture::new();
// these numbers are handtuned to trigger hard buffer limits w/o making the test too big // these numbers are handtuned to trigger hard buffer limits w/o making the test too big
let n_entries = 50u64; let n_entries = 50u64;
let sequenced_entries: Vec<_> = (0..n_entries) let sequenced_entries: Vec<_> = (0..n_entries)
@ -1295,6 +1308,12 @@ mod tests {
} }
.run() .run()
.await; .await;
// check that hard buffer limit was actually hit (otherwise this test is pointless/outdated)
assert_contains!(
tracing_capture.to_string(),
"Hard limit reached while replaying, waiting for compaction to catch up"
);
} }
#[tokio::test] #[tokio::test]