feat: Don't add a partition to skipped_compactions if it makes progress

If a partition takes longer than `partition_timeout` to compact, but it
did make _some_ progress, let the compactor try that partition again at
a later time so that compaction for the partition will eventually
complete.

If a partition times out and _no_ progress has been made, then still add
it to the skipped_compactions table because it's either too big to ever
compact or is otherwise stuck.

Closes influxdata/idpe#17234.
pull/24376/head
Carol (Nichols || Goulding) 2023-03-10 17:14:47 -05:00
parent 2817e1adb4
commit 413635d25a
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
3 changed files with 179 additions and 13 deletions

1
Cargo.lock generated
View File

@ -975,6 +975,7 @@ name = "compactor2"
version = "0.1.0"
dependencies = [
"arrow_util",
"assert_matches",
"async-trait",
"backoff",
"bytes",

View File

@ -33,6 +33,7 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]
arrow_util = { path = "../arrow_util" }
assert_matches = "1"
compactor2_test_utils = { path = "../compactor2_test_utils" }
iox_tests = { path = "../iox_tests" }
test_helpers = { path = "../test_helpers"}

View File

@ -1,14 +1,15 @@
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use std::{fmt, future::Future, num::NonZeroUsize, sync::Arc, time::Duration};
use data_types::{CompactionLevel, ParquetFile, ParquetFileParams, PartitionId};
use futures::StreamExt;
use observability_deps::tracing::info;
use parquet_file::ParquetFilePath;
use tokio::sync::watch::{self, Sender};
use tracker::InstrumentedAsyncSemaphore;
use crate::{
components::{scratchpad::Scratchpad, Components},
error::DynError,
error::{DynError, ErrorKind, SimpleError},
file_classification::{FileToSplit, FilesToCompactOrSplit},
partition_info::PartitionInfo,
PlanIR,
@ -49,19 +50,37 @@ async fn compact_partition(
info!(partition_id = partition_id.get(), "compact partition",);
let mut scratchpad = components.scratchpad_gen.pad();
let res = tokio::time::timeout(
partition_timeout,
try_compact_partition(
partition_id,
job_semaphore,
Arc::clone(&components),
scratchpad.as_mut(),
),
)
let res = timeout_with_progress_checking(partition_timeout, |transmit_progress_signal| {
let components = Arc::clone(&components);
async {
try_compact_partition(
partition_id,
job_semaphore,
components,
scratchpad.as_mut(),
transmit_progress_signal,
)
.await
}
})
.await;
let res = match res {
Ok(res) => res,
Err(e) => Err(Box::new(e) as _),
// If `try_compact_partition` timed out and didn't make any progress, something is wrong
// with this partition and it should get added to the `skipped_compactions` table by
// sending a timeout error to the `partition_done_sink`.
TimeoutWithProgress::NoWorkTimeOutError => Err(Box::new(SimpleError::new(
ErrorKind::Timeout,
"timeout without making any progress",
)) as _),
// If `try_compact_partition` timed out but *did* make some progress, this is fine, don't
// add it to the `skipped_compactions` table.
TimeoutWithProgress::SomeWorkTryAgain => Ok(()),
// If `try_compact_partition` finished before the timeout, return the `Result` that it
// returned. If an error was returned, there could be something wrong with the partiton;
// let the `partition_done_sink` decide if the error means the partition should be added
// to the `skipped_compactions` table or not.
TimeoutWithProgress::Completed(res) => res,
};
components
.partition_done_sink
@ -163,6 +182,7 @@ async fn try_compact_partition(
job_semaphore: Arc<InstrumentedAsyncSemaphore>,
components: Arc<Components>,
scratchpad_ctx: &mut dyn Scratchpad,
transmit_progress_signal: Sender<bool>,
) -> Result<(), DynError> {
let mut files = components.partition_files_source.fetch(partition_id).await;
let partition_info = components.partition_info_source.fetch(partition_id).await?;
@ -257,6 +277,13 @@ async fn try_compact_partition(
}
files = files_next;
// Report to `timeout_with_progress_checking` that some progress has been made; stop
// if sending this signal fails because something has gone terribly wrong for the other
// end of the channel to not be listening anymore.
if let Err(e) = transmit_progress_signal.send(true) {
return Err(Box::new(e));
}
}
}
@ -507,3 +534,140 @@ async fn update_catalog(
(created_file_params, upgraded_files)
}
/// Returned information from a call to [`timeout_with_progress_checking`].
enum TimeoutWithProgress<R> {
/// The inner future timed out and _no_ progress was reported.
NoWorkTimeOutError,
/// The inner future timed out and _some_ progress was reported.
SomeWorkTryAgain,
/// The inner future completed before the timeout and returned a value of type `R`.
Completed(R),
}
impl<R: fmt::Debug> fmt::Debug for TimeoutWithProgress<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NoWorkTimeOutError => write!(f, "TimeoutWithProgress::NoWorkTimeOutError"),
Self::SomeWorkTryAgain => write!(f, "TimeoutWithProgress::SomeWorkTryAgain"),
Self::Completed(r) => write!(f, "TimeoutWithProgress::Completed({:?})", r),
}
}
}
/// Set an overall timeout for a future that has some concept of making progress or not, and if the
/// future times out, send a different [`TimeoutWithProgress`] value depending on whether there
/// was no work done or some work done. This lets the calling code assess whether it might be worth
/// trying the operation again to make more progress, or whether the future is somehow stuck or
/// takes too long to ever work.
///
/// # Parameters
///
/// * `full_timeout`: The timeout duration the future is allowed to spend
/// * `inner_future`: A function taking a [`tokio::sync::watch::Sender<bool>`] that returns a
/// future. This function expects that the body of the future will call `send(true)` to indicate
/// that progress has been made, however the future defines "progress". If the future times out,
/// this function will return `TimeoutWithProgress::SomeWorkTryAgain` if it has received at least
/// one `true` value and `TimeoutWithProgress::NoWorkTimeOutError` if nothing was sent. If the
/// future finishes before `full_timeout`, this function will return
/// `TimeoutWithProgress::Completed` and pass along the returned value from the future.
async fn timeout_with_progress_checking<F, Fut>(
full_timeout: Duration,
inner_future: F,
) -> TimeoutWithProgress<Fut::Output>
where
F: FnOnce(Sender<bool>) -> Fut + Send,
Fut: Future + Send,
{
let (transmit_progress_signal, receive_progress_signal) = watch::channel(false);
let called_inner_future = inner_future(transmit_progress_signal);
match tokio::time::timeout(full_timeout, called_inner_future).await {
Ok(val) => TimeoutWithProgress::Completed(val),
Err(_) => {
let progress = *receive_progress_signal.borrow();
if progress {
TimeoutWithProgress::SomeWorkTryAgain
} else {
TimeoutWithProgress::NoWorkTimeOutError
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
#[tokio::test]
async fn reports_progress_completes_and_returns_ok_under_timeout() {
let state = timeout_with_progress_checking(Duration::from_millis(5), |tx| async move {
// No loop in this test; report progress and then return success to simulate
// successfully completing all work before the timeout.
let _ignore_send_errors = tx.send(true);
Result::<(), String>::Ok(())
})
.await;
assert_matches!(state, TimeoutWithProgress::Completed(Ok(())));
}
#[tokio::test]
async fn reports_progress_completes_and_returns_err_under_timeout() {
let state = timeout_with_progress_checking(Duration::from_millis(5), |tx| async move {
// No loop in this test; report progress and then return an error to simulate
// a problem occurring before the timeout.
let _ignore_send_errors = tx.send(true);
Result::<(), String>::Err(String::from("there was a problem"))
})
.await;
assert_matches!(
state,
TimeoutWithProgress::Completed(Err(e)) if e == "there was a problem"
);
}
#[tokio::test]
async fn doesnt_report_progress_returns_err_under_timeout() {
let state = timeout_with_progress_checking(Duration::from_millis(5), |tx| async move {
// No loop in this test; report progress and then return success to simulate
// successfully completing all work before the timeout.
let _ignore_send_errors = tx.send(true);
Result::<(), String>::Ok(())
})
.await;
assert_matches!(state, TimeoutWithProgress::Completed(Ok(())));
}
#[tokio::test]
async fn reports_progress_then_times_out() {
let state = timeout_with_progress_checking(Duration::from_millis(5), |tx| async move {
loop {
// Sleep for 2 ms, which should be able to run and report progress and then timeout
// because it will never complete
tokio::time::sleep(Duration::from_millis(2)).await;
let _ignore_send_errors = tx.send(true);
}
})
.await;
assert_matches!(state, TimeoutWithProgress::SomeWorkTryAgain);
}
#[tokio::test]
async fn doesnt_report_progress_then_times_out() {
let state = timeout_with_progress_checking(Duration::from_millis(5), |_tx| async move {
// No loop in this test; don't report progress and then sleep enough that this will
// time out.
tokio::time::sleep(Duration::from_millis(10)).await;
Result::<(), String>::Ok(())
})
.await;
assert_matches!(state, TimeoutWithProgress::NoWorkTimeOutError);
}
}