From 5f2f735c7ef342b8af1a9248a091bf81cb3e6265 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 28 Sep 2022 15:17:00 +0200 Subject: [PATCH] fix: spurious watermark < read offset panic In staging we observed an ingester panic due to the write buffer stream yielding an WriteBufferErrorKind::SequenceNumberAfterWatermark, suggesting the ingester was attempting to read from an offset that exceeds the current max write offset in Kafka (high watermark offset). This turned out not to be the case - the partition had a single write at offset 2, and the ingester was attempting to seek to offset 1. The first read would fail (offset 1 does not exist) and the error handling did not account for the high watermark not being correctly set (-1 in the response). I have no idea why rskafka returns this watermark / doesn't retry / etc but this change will allow the ingesters to recover. --- ingester/src/stream_handler/handler.rs | 4 ++++ write_buffer/src/kafka/mod.rs | 8 ++++++++ 2 files changed, 12 insertions(+) diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index ae0bcb1016..3fa563b188 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -794,6 +794,7 @@ mod tests { assert_eq!(op.namespace(), "bananas"); } ); + test_stream_handler!( non_fatal_stream_offset_error, skip_to_oldest_available = false, @@ -815,6 +816,7 @@ mod tests { assert_eq!(op.namespace(), "bananas"); } ); + test_stream_handler!( skip_to_oldest_on_unknown_sequence_number, skip_to_oldest_available = true, @@ -843,6 +845,7 @@ mod tests { assert_eq!(op.namespace(), "bananas"); } ); + test_stream_handler!( non_fatal_stream_invalid_data, skip_to_oldest_available = false, @@ -864,6 +867,7 @@ mod tests { assert_eq!(op.namespace(), "bananas"); } ); + test_stream_handler!( non_fatal_stream_unknown_error, skip_to_oldest_available = false, diff --git a/write_buffer/src/kafka/mod.rs b/write_buffer/src/kafka/mod.rs index e5903f8905..8db3b89cd5 100644 --- a/write_buffer/src/kafka/mod.rs +++ b/write_buffer/src/kafka/mod.rs @@ -278,6 +278,14 @@ impl WriteBufferStreamHandler for RSKafkaStreamHandler { Err(e) => { terminated.store(true, Ordering::SeqCst); let kind = match e { + RSKafkaError::ServerError { + protocol_error: ProtocolError::OffsetOutOfRange, + response: + Some(ServerErrorResponse::PartitionFetchState { + high_watermark, .. + }), + .. + } if high_watermark < 0 => WriteBufferErrorKind::Unknown, RSKafkaError::ServerError { protocol_error: ProtocolError::OffsetOutOfRange, request: RequestContext::Fetch { offset, .. },