From 5f49c568c9cdfc5402046727fbc84a4d68078d5d Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 29 Sep 2022 11:33:09 +0200 Subject: [PATCH] fix: remove future offset read check In https://github.com/influxdata/influxdb_iox/pull/5754 I added code at seek() time to check if the offset exists, and refuse to seek if that's not the case, effectively making this check redundant - I left it in on the assumption that some cases previously added would work! Unfortunately this doesn't seem to be the case - performing a read-ahead-of-data and read-behind-data seems to cause the high_watermark to be returned as -1, meaning this code never worked?! This new read-ahead-of-data match arm took priority over the SequenceNumberNoLongerExists arm, effectively preventing the ingester from taking the desired remediation (skipping to most recent write, or erroring, depending on configuration). --- write_buffer/src/kafka/mod.rs | 35 ++++++----------------------------- 1 file changed, 6 insertions(+), 29 deletions(-) diff --git a/write_buffer/src/kafka/mod.rs b/write_buffer/src/kafka/mod.rs index ddff84a746..ff98e473d4 100644 --- a/write_buffer/src/kafka/mod.rs +++ b/write_buffer/src/kafka/mod.rs @@ -24,7 +24,7 @@ use parking_lot::Mutex; use rskafka::{ client::{ consumer::{StartOffset, StreamConsumerBuilder}, - error::{Error as RSKafkaError, ProtocolError, RequestContext, ServerErrorResponse}, + error::{Error as RSKafkaError, ProtocolError}, partition::{OffsetAt, PartitionClient, UnknownTopicHandling}, producer::{BatchProducer, BatchProducerBuilder}, ClientBuilder, @@ -280,34 +280,11 @@ impl WriteBufferStreamHandler for RSKafkaStreamHandler { let kind = match e { RSKafkaError::ServerError { protocol_error: ProtocolError::OffsetOutOfRange, - response: - Some(ServerErrorResponse::PartitionFetchState { - high_watermark, .. - }), + // NOTE: the high watermark included in this + // response is always -1 when reading before/after + // valid offsets. .. - } if high_watermark < 0 => WriteBufferErrorKind::Unknown, - RSKafkaError::ServerError { - protocol_error: ProtocolError::OffsetOutOfRange, - request: RequestContext::Fetch { offset, .. }, - response: - Some(ServerErrorResponse::PartitionFetchState { - high_watermark, .. - }), - .. - } if high_watermark < offset => { - WriteBufferErrorKind::SequenceNumberAfterWatermark - } - RSKafkaError::ServerError { - protocol_error: ProtocolError::OffsetOutOfRange, - request: RequestContext::Fetch { offset, .. }, - response: - Some(ServerErrorResponse::PartitionFetchState { - high_watermark, .. - }), - .. - } if high_watermark >= offset => { - WriteBufferErrorKind::SequenceNumberNoLongerExists - } + } => WriteBufferErrorKind::SequenceNumberNoLongerExists, _ => WriteBufferErrorKind::Unknown, }; return Err(WriteBufferError::new(kind, e)); @@ -349,7 +326,7 @@ impl WriteBufferStreamHandler for RSKafkaStreamHandler { async fn seek(&mut self, sequence_number: SequenceNumber) -> Result<(), WriteBufferError> { let offset = sequence_number.get(); let current = self.partition_client.get_offset(OffsetAt::Latest).await?; - if offset > current { + if dbg!(offset) > dbg!(current) { return Err(WriteBufferError::sequence_number_after_watermark(format!( "attempted to seek to offset {offset}, but current high \ watermark for partition {p} is {current}",