diff --git a/ingester/src/ingest_state.rs b/ingester/src/ingest_state.rs index 6769d0b9e8..4c5030eaee 100644 --- a/ingester/src/ingest_state.rs +++ b/ingester/src/ingest_state.rs @@ -132,11 +132,12 @@ impl IngestState { /// /// # Precedence /// - /// If more than one error state it set, a single error is returned based on + /// If more than one error state is set, a single error is returned based on /// the following precedence (ordered by highest priority to lowest): /// /// 1. [`IngestStateError::GracefulStop`] - /// 2. [`IngestStateError::PersistSaturated`]. + /// 2. [`IngestStateError::DiskFull`] + /// 3. [`IngestStateError::PersistSaturated`]. /// pub(crate) fn read(&self) -> Result<(), IngestStateError> { let current = self.state.load(Ordering::Relaxed); @@ -149,13 +150,37 @@ impl IngestState { Ok(()) } + + /// Return the current [`IngestStateError`], if any, filtering out any + /// contained within the given list of `exceptions`. + /// + /// # Precedence + /// + /// If more than one error state is set, this follows the same precedence + /// rules as [`IngestState::read()`]. + pub(crate) fn read_with_exceptions( + &self, + exceptions: [IngestStateError; N], + ) -> Result<(), IngestStateError> { + let exception_mask = exceptions + .into_iter() + .map(IngestStateError::as_bits) + .fold(0, std::ops::BitOr::bitor); + let current = self.state.load(Ordering::Relaxed); + + if current & !exception_mask != 0 { + return as_err(current & !exception_mask); + } + + Ok(()) + } } /// Map `state` to exactly one [`IngestStateError`]. /// /// Shutdown always takes precedence, ensuring that once set, this is the error /// the user always sees (instead of potentially flip-flopping between "shutting -/// down" and "persist saturated"). +/// down", "persist saturated" & "disk full"). #[cold] fn as_err(state: usize) -> Result<(), IngestStateError> { if state & IngestStateError::GracefulStop.as_bits() != 0 { @@ -308,5 +333,27 @@ mod tests { prop_assert!(!state.set(error_a)); } } + + /// For every [`IngestStateError`] pair, this test checks that setting + /// the LHS on an [`IngestState`], then reading from it with an ignore + /// exception of the RHS produces the correct result. If the exception + /// matches the error set on the state then it should be filtered and + /// return `Ok`, while an error state that isn't in the exception list + /// is returned. + #[test] + fn test_read_with_exceptions(set_error in ingest_state_errors(), except_error in ingest_state_errors()) { + let state = IngestState::default(); + + assert!(state.set(set_error)); + + // Assert the exception filter returns results as expected. + if std::mem::discriminant(&set_error) == std::mem::discriminant(&except_error) { + assert_matches!(state.read_with_exceptions([except_error]), Ok(())); + } else { + assert_matches!(state.read_with_exceptions([except_error]), Err(got_err) => { + prop_assert_eq!(std::mem::discriminant(&set_error), std::mem::discriminant(&got_err)); + }) + } + } } }