diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index ae0bcb1016..3fa563b188 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -794,6 +794,7 @@ mod tests { assert_eq!(op.namespace(), "bananas"); } ); + test_stream_handler!( non_fatal_stream_offset_error, skip_to_oldest_available = false, @@ -815,6 +816,7 @@ mod tests { assert_eq!(op.namespace(), "bananas"); } ); + test_stream_handler!( skip_to_oldest_on_unknown_sequence_number, skip_to_oldest_available = true, @@ -843,6 +845,7 @@ mod tests { assert_eq!(op.namespace(), "bananas"); } ); + test_stream_handler!( non_fatal_stream_invalid_data, skip_to_oldest_available = false, @@ -864,6 +867,7 @@ mod tests { assert_eq!(op.namespace(), "bananas"); } ); + test_stream_handler!( non_fatal_stream_unknown_error, skip_to_oldest_available = false, diff --git a/write_buffer/src/kafka/mod.rs b/write_buffer/src/kafka/mod.rs index e5903f8905..8db3b89cd5 100644 --- a/write_buffer/src/kafka/mod.rs +++ b/write_buffer/src/kafka/mod.rs @@ -278,6 +278,14 @@ impl WriteBufferStreamHandler for RSKafkaStreamHandler { Err(e) => { terminated.store(true, Ordering::SeqCst); let kind = match e { + RSKafkaError::ServerError { + protocol_error: ProtocolError::OffsetOutOfRange, + response: + Some(ServerErrorResponse::PartitionFetchState { + high_watermark, .. + }), + .. + } if high_watermark < 0 => WriteBufferErrorKind::Unknown, RSKafkaError::ServerError { protocol_error: ProtocolError::OffsetOutOfRange, request: RequestContext::Fetch { offset, .. },