diff --git a/write_buffer/src/kafka/mod.rs b/write_buffer/src/kafka/mod.rs index ddff84a746..ff98e473d4 100644 --- a/write_buffer/src/kafka/mod.rs +++ b/write_buffer/src/kafka/mod.rs @@ -24,7 +24,7 @@ use parking_lot::Mutex; use rskafka::{ client::{ consumer::{StartOffset, StreamConsumerBuilder}, - error::{Error as RSKafkaError, ProtocolError, RequestContext, ServerErrorResponse}, + error::{Error as RSKafkaError, ProtocolError}, partition::{OffsetAt, PartitionClient, UnknownTopicHandling}, producer::{BatchProducer, BatchProducerBuilder}, ClientBuilder, @@ -280,34 +280,11 @@ impl WriteBufferStreamHandler for RSKafkaStreamHandler { let kind = match e { RSKafkaError::ServerError { protocol_error: ProtocolError::OffsetOutOfRange, - response: - Some(ServerErrorResponse::PartitionFetchState { - high_watermark, .. - }), + // NOTE: the high watermark included in this + // response is always -1 when reading before/after + // valid offsets. .. - } if high_watermark < 0 => WriteBufferErrorKind::Unknown, - RSKafkaError::ServerError { - protocol_error: ProtocolError::OffsetOutOfRange, - request: RequestContext::Fetch { offset, .. }, - response: - Some(ServerErrorResponse::PartitionFetchState { - high_watermark, .. - }), - .. - } if high_watermark < offset => { - WriteBufferErrorKind::SequenceNumberAfterWatermark - } - RSKafkaError::ServerError { - protocol_error: ProtocolError::OffsetOutOfRange, - request: RequestContext::Fetch { offset, .. }, - response: - Some(ServerErrorResponse::PartitionFetchState { - high_watermark, .. - }), - .. - } if high_watermark >= offset => { - WriteBufferErrorKind::SequenceNumberNoLongerExists - } + } => WriteBufferErrorKind::SequenceNumberNoLongerExists, _ => WriteBufferErrorKind::Unknown, }; return Err(WriteBufferError::new(kind, e)); @@ -349,7 +326,7 @@ impl WriteBufferStreamHandler for RSKafkaStreamHandler { async fn seek(&mut self, sequence_number: SequenceNumber) -> Result<(), WriteBufferError> { let offset = sequence_number.get(); let current = self.partition_client.get_offset(OffsetAt::Latest).await?; - if offset > current { + if dbg!(offset) > dbg!(current) { return Err(WriteBufferError::sequence_number_after_watermark(format!( "attempted to seek to offset {offset}, but current high \ watermark for partition {p} is {current}",