From 413635d25ab8d7a19d8eb34f7b862ec8051982e9 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 10 Mar 2023 17:14:47 -0500 Subject: [PATCH 1/3] feat: Don't add a partition to skipped_compactions if it makes progress If a partition takes longer than `partition_timeout` to compact, but it did make _some_ progress, let the compactor try that partition again at a later time so that compaction for the partition will eventually complete. If a partition times out and _no_ progress has been made, then still add it to the skipped_compactions table because it's either too big to ever compact or is otherwise stuck. Closes influxdata/idpe#17234. --- Cargo.lock | 1 + compactor2/Cargo.toml | 1 + compactor2/src/driver.rs | 190 ++++++++++++++++++++++++++++++++++++--- 3 files changed, 179 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a0a661f7db..79944f0bd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -975,6 +975,7 @@ name = "compactor2" version = "0.1.0" dependencies = [ "arrow_util", + "assert_matches", "async-trait", "backoff", "bytes", diff --git a/compactor2/Cargo.toml b/compactor2/Cargo.toml index c0149292ec..8757cdd1b3 100644 --- a/compactor2/Cargo.toml +++ b/compactor2/Cargo.toml @@ -33,6 +33,7 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" } [dev-dependencies] arrow_util = { path = "../arrow_util" } +assert_matches = "1" compactor2_test_utils = { path = "../compactor2_test_utils" } iox_tests = { path = "../iox_tests" } test_helpers = { path = "../test_helpers"} diff --git a/compactor2/src/driver.rs b/compactor2/src/driver.rs index eef72200ae..70fa41a9ff 100644 --- a/compactor2/src/driver.rs +++ b/compactor2/src/driver.rs @@ -1,14 +1,15 @@ -use std::{num::NonZeroUsize, sync::Arc, time::Duration}; +use std::{fmt, future::Future, num::NonZeroUsize, sync::Arc, time::Duration}; use data_types::{CompactionLevel, ParquetFile, ParquetFileParams, PartitionId}; use futures::StreamExt; use observability_deps::tracing::info; use parquet_file::ParquetFilePath; +use tokio::sync::watch::{self, Sender}; use tracker::InstrumentedAsyncSemaphore; use crate::{ components::{scratchpad::Scratchpad, Components}, - error::DynError, + error::{DynError, ErrorKind, SimpleError}, file_classification::{FileToSplit, FilesToCompactOrSplit}, partition_info::PartitionInfo, PlanIR, @@ -49,19 +50,37 @@ async fn compact_partition( info!(partition_id = partition_id.get(), "compact partition",); let mut scratchpad = components.scratchpad_gen.pad(); - let res = tokio::time::timeout( - partition_timeout, - try_compact_partition( - partition_id, - job_semaphore, - Arc::clone(&components), - scratchpad.as_mut(), - ), - ) + let res = timeout_with_progress_checking(partition_timeout, |transmit_progress_signal| { + let components = Arc::clone(&components); + async { + try_compact_partition( + partition_id, + job_semaphore, + components, + scratchpad.as_mut(), + transmit_progress_signal, + ) + .await + } + }) .await; + let res = match res { - Ok(res) => res, - Err(e) => Err(Box::new(e) as _), + // If `try_compact_partition` timed out and didn't make any progress, something is wrong + // with this partition and it should get added to the `skipped_compactions` table by + // sending a timeout error to the `partition_done_sink`. + TimeoutWithProgress::NoWorkTimeOutError => Err(Box::new(SimpleError::new( + ErrorKind::Timeout, + "timeout without making any progress", + )) as _), + // If `try_compact_partition` timed out but *did* make some progress, this is fine, don't + // add it to the `skipped_compactions` table. + TimeoutWithProgress::SomeWorkTryAgain => Ok(()), + // If `try_compact_partition` finished before the timeout, return the `Result` that it + // returned. If an error was returned, there could be something wrong with the partiton; + // let the `partition_done_sink` decide if the error means the partition should be added + // to the `skipped_compactions` table or not. + TimeoutWithProgress::Completed(res) => res, }; components .partition_done_sink @@ -163,6 +182,7 @@ async fn try_compact_partition( job_semaphore: Arc, components: Arc, scratchpad_ctx: &mut dyn Scratchpad, + transmit_progress_signal: Sender, ) -> Result<(), DynError> { let mut files = components.partition_files_source.fetch(partition_id).await; let partition_info = components.partition_info_source.fetch(partition_id).await?; @@ -257,6 +277,13 @@ async fn try_compact_partition( } files = files_next; + + // Report to `timeout_with_progress_checking` that some progress has been made; stop + // if sending this signal fails because something has gone terribly wrong for the other + // end of the channel to not be listening anymore. + if let Err(e) = transmit_progress_signal.send(true) { + return Err(Box::new(e)); + } } } @@ -507,3 +534,140 @@ async fn update_catalog( (created_file_params, upgraded_files) } + +/// Returned information from a call to [`timeout_with_progress_checking`]. +enum TimeoutWithProgress { + /// The inner future timed out and _no_ progress was reported. + NoWorkTimeOutError, + /// The inner future timed out and _some_ progress was reported. + SomeWorkTryAgain, + /// The inner future completed before the timeout and returned a value of type `R`. + Completed(R), +} + +impl fmt::Debug for TimeoutWithProgress { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::NoWorkTimeOutError => write!(f, "TimeoutWithProgress::NoWorkTimeOutError"), + Self::SomeWorkTryAgain => write!(f, "TimeoutWithProgress::SomeWorkTryAgain"), + Self::Completed(r) => write!(f, "TimeoutWithProgress::Completed({:?})", r), + } + } +} + +/// Set an overall timeout for a future that has some concept of making progress or not, and if the +/// future times out, send a different [`TimeoutWithProgress`] value depending on whether there +/// was no work done or some work done. This lets the calling code assess whether it might be worth +/// trying the operation again to make more progress, or whether the future is somehow stuck or +/// takes too long to ever work. +/// +/// # Parameters +/// +/// * `full_timeout`: The timeout duration the future is allowed to spend +/// * `inner_future`: A function taking a [`tokio::sync::watch::Sender`] that returns a +/// future. This function expects that the body of the future will call `send(true)` to indicate +/// that progress has been made, however the future defines "progress". If the future times out, +/// this function will return `TimeoutWithProgress::SomeWorkTryAgain` if it has received at least +/// one `true` value and `TimeoutWithProgress::NoWorkTimeOutError` if nothing was sent. If the +/// future finishes before `full_timeout`, this function will return +/// `TimeoutWithProgress::Completed` and pass along the returned value from the future. +async fn timeout_with_progress_checking( + full_timeout: Duration, + inner_future: F, +) -> TimeoutWithProgress +where + F: FnOnce(Sender) -> Fut + Send, + Fut: Future + Send, +{ + let (transmit_progress_signal, receive_progress_signal) = watch::channel(false); + + let called_inner_future = inner_future(transmit_progress_signal); + + match tokio::time::timeout(full_timeout, called_inner_future).await { + Ok(val) => TimeoutWithProgress::Completed(val), + Err(_) => { + let progress = *receive_progress_signal.borrow(); + if progress { + TimeoutWithProgress::SomeWorkTryAgain + } else { + TimeoutWithProgress::NoWorkTimeOutError + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use assert_matches::assert_matches; + + #[tokio::test] + async fn reports_progress_completes_and_returns_ok_under_timeout() { + let state = timeout_with_progress_checking(Duration::from_millis(5), |tx| async move { + // No loop in this test; report progress and then return success to simulate + // successfully completing all work before the timeout. + let _ignore_send_errors = tx.send(true); + Result::<(), String>::Ok(()) + }) + .await; + + assert_matches!(state, TimeoutWithProgress::Completed(Ok(()))); + } + + #[tokio::test] + async fn reports_progress_completes_and_returns_err_under_timeout() { + let state = timeout_with_progress_checking(Duration::from_millis(5), |tx| async move { + // No loop in this test; report progress and then return an error to simulate + // a problem occurring before the timeout. + let _ignore_send_errors = tx.send(true); + Result::<(), String>::Err(String::from("there was a problem")) + }) + .await; + + assert_matches!( + state, + TimeoutWithProgress::Completed(Err(e)) if e == "there was a problem" + ); + } + + #[tokio::test] + async fn doesnt_report_progress_returns_err_under_timeout() { + let state = timeout_with_progress_checking(Duration::from_millis(5), |tx| async move { + // No loop in this test; report progress and then return success to simulate + // successfully completing all work before the timeout. + let _ignore_send_errors = tx.send(true); + Result::<(), String>::Ok(()) + }) + .await; + + assert_matches!(state, TimeoutWithProgress::Completed(Ok(()))); + } + + #[tokio::test] + async fn reports_progress_then_times_out() { + let state = timeout_with_progress_checking(Duration::from_millis(5), |tx| async move { + loop { + // Sleep for 2 ms, which should be able to run and report progress and then timeout + // because it will never complete + tokio::time::sleep(Duration::from_millis(2)).await; + let _ignore_send_errors = tx.send(true); + } + }) + .await; + + assert_matches!(state, TimeoutWithProgress::SomeWorkTryAgain); + } + + #[tokio::test] + async fn doesnt_report_progress_then_times_out() { + let state = timeout_with_progress_checking(Duration::from_millis(5), |_tx| async move { + // No loop in this test; don't report progress and then sleep enough that this will + // time out. + tokio::time::sleep(Duration::from_millis(10)).await; + Result::<(), String>::Ok(()) + }) + .await; + + assert_matches!(state, TimeoutWithProgress::NoWorkTimeOutError); + } +} From 6fd910a0919ce87793621eddc117a18e6213f7a6 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 15 Mar 2023 17:06:58 -0400 Subject: [PATCH 2/3] fix: Make really really sure this test hits the timeout --- compactor2/src/driver.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compactor2/src/driver.rs b/compactor2/src/driver.rs index 70fa41a9ff..48b22772e0 100644 --- a/compactor2/src/driver.rs +++ b/compactor2/src/driver.rs @@ -663,7 +663,7 @@ mod tests { let state = timeout_with_progress_checking(Duration::from_millis(5), |_tx| async move { // No loop in this test; don't report progress and then sleep enough that this will // time out. - tokio::time::sleep(Duration::from_millis(10)).await; + tokio::time::sleep(Duration::from_secs(1)).await; Result::<(), String>::Ok(()) }) .await; From c9bfcb4501242cff5cdae98325767c98ea1a6bf1 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 15 Mar 2023 17:10:10 -0400 Subject: [PATCH 3/3] test: Actually make this test a different case, oops --- compactor2/src/driver.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/compactor2/src/driver.rs b/compactor2/src/driver.rs index 48b22772e0..9b275a27b6 100644 --- a/compactor2/src/driver.rs +++ b/compactor2/src/driver.rs @@ -632,15 +632,15 @@ mod tests { #[tokio::test] async fn doesnt_report_progress_returns_err_under_timeout() { - let state = timeout_with_progress_checking(Duration::from_millis(5), |tx| async move { - // No loop in this test; report progress and then return success to simulate - // successfully completing all work before the timeout. - let _ignore_send_errors = tx.send(true); - Result::<(), String>::Ok(()) + let state = timeout_with_progress_checking(Duration::from_millis(5), |_tx| async move { + Result::<(), String>::Err(String::from("there was a problem")) }) .await; - assert_matches!(state, TimeoutWithProgress::Completed(Ok(()))); + assert_matches!( + state, + TimeoutWithProgress::Completed(Err(e)) if e == "there was a problem" + ); } #[tokio::test]