feat(ingester): Allow read of `IngestState` with exceptions

This will enable some subsystems to trivially respect any `IngestStateError`
set while ignoring specific errors which they may be responsible for
resolving (such as WAL replay needing to ingest from disk when `DiskFull`
is set).
pull/24376/head
Fraser Savage 2023-09-05 16:49:41 +01:00
parent 790e797e6c
commit be9064c75f
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
1 changed files with 50 additions and 3 deletions

View File

@ -132,11 +132,12 @@ impl IngestState {
///
/// # Precedence
///
/// If more than one error state it set, a single error is returned based on
/// If more than one error state is set, a single error is returned based on
/// the following precedence (ordered by highest priority to lowest):
///
/// 1. [`IngestStateError::GracefulStop`]
/// 2. [`IngestStateError::PersistSaturated`].
/// 2. [`IngestStateError::DiskFull`]
/// 3. [`IngestStateError::PersistSaturated`].
///
pub(crate) fn read(&self) -> Result<(), IngestStateError> {
let current = self.state.load(Ordering::Relaxed);
@ -149,13 +150,37 @@ impl IngestState {
Ok(())
}
/// Return the current [`IngestStateError`], if any, filtering out any
/// contained within the given list of `exceptions`.
///
/// # Precedence
///
/// If more than one error state is set, this follows the same precedence
/// rules as [`IngestState::read()`].
pub(crate) fn read_with_exceptions<const N: usize>(
&self,
exceptions: [IngestStateError; N],
) -> Result<(), IngestStateError> {
let exception_mask = exceptions
.into_iter()
.map(IngestStateError::as_bits)
.fold(0, std::ops::BitOr::bitor);
let current = self.state.load(Ordering::Relaxed);
if current & !exception_mask != 0 {
return as_err(current & !exception_mask);
}
Ok(())
}
}
/// Map `state` to exactly one [`IngestStateError`].
///
/// Shutdown always takes precedence, ensuring that once set, this is the error
/// the user always sees (instead of potentially flip-flopping between "shutting
/// down" and "persist saturated").
/// down", "persist saturated" & "disk full").
#[cold]
fn as_err(state: usize) -> Result<(), IngestStateError> {
if state & IngestStateError::GracefulStop.as_bits() != 0 {
@ -308,5 +333,27 @@ mod tests {
prop_assert!(!state.set(error_a));
}
}
/// For every [`IngestStateError`] pair, this test checks that setting
/// the LHS on an [`IngestState`], then reading from it with an ignore
/// exception of the RHS produces the correct result. If the exception
/// matches the error set on the state then it should be filtered and
/// return `Ok`, while an error state that isn't in the exception list
/// is returned.
#[test]
fn test_read_with_exceptions(set_error in ingest_state_errors(), except_error in ingest_state_errors()) {
let state = IngestState::default();
assert!(state.set(set_error));
// Assert the exception filter returns results as expected.
if std::mem::discriminant(&set_error) == std::mem::discriminant(&except_error) {
assert_matches!(state.read_with_exceptions([except_error]), Ok(()));
} else {
assert_matches!(state.read_with_exceptions([except_error]), Err(got_err) => {
prop_assert_eq!(std::mem::discriminant(&set_error), std::mem::discriminant(&got_err));
})
}
}
}
}