fix: spurious watermark < read offset panic

In staging we observed an ingester panic due to the write buffer stream
yielding an WriteBufferErrorKind::SequenceNumberAfterWatermark,
suggesting the ingester was attempting to read from an offset that
exceeds the current max write offset in Kafka (high watermark offset).

This turned out not to be the case - the partition had a single write at
offset 2, and the ingester was attempting to seek to offset 1. The first
read would fail (offset 1 does not exist) and the error handling did not
account for the high watermark not being correctly set (-1 in the
response).

I have no idea why rskafka returns this watermark / doesn't retry / etc
but this change will allow the ingesters to recover.
pull/24376/head
Dom Dwyer 2022-09-28 15:17:00 +02:00
parent 13ed1c089a
commit 5f2f735c7e
2 changed files with 12 additions and 0 deletions

View File

@ -794,6 +794,7 @@ mod tests {
assert_eq!(op.namespace(), "bananas");
}
);
test_stream_handler!(
non_fatal_stream_offset_error,
skip_to_oldest_available = false,
@ -815,6 +816,7 @@ mod tests {
assert_eq!(op.namespace(), "bananas");
}
);
test_stream_handler!(
skip_to_oldest_on_unknown_sequence_number,
skip_to_oldest_available = true,
@ -843,6 +845,7 @@ mod tests {
assert_eq!(op.namespace(), "bananas");
}
);
test_stream_handler!(
non_fatal_stream_invalid_data,
skip_to_oldest_available = false,
@ -864,6 +867,7 @@ mod tests {
assert_eq!(op.namespace(), "bananas");
}
);
test_stream_handler!(
non_fatal_stream_unknown_error,
skip_to_oldest_available = false,

View File

@ -278,6 +278,14 @@ impl WriteBufferStreamHandler for RSKafkaStreamHandler {
Err(e) => {
terminated.store(true, Ordering::SeqCst);
let kind = match e {
RSKafkaError::ServerError {
protocol_error: ProtocolError::OffsetOutOfRange,
response:
Some(ServerErrorResponse::PartitionFetchState {
high_watermark, ..
}),
..
} if high_watermark < 0 => WriteBufferErrorKind::Unknown,
RSKafkaError::ServerError {
protocol_error: ProtocolError::OffsetOutOfRange,
request: RequestContext::Fetch { offset, .. },