Merge pull request #5764 from influxdata/dom/remove-after-watermark-check

fix: remove future offset read check
pull/24376/head
Dom 2022-09-29 10:53:25 +01:00 committed by GitHub
commit 71cab5a1a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 6 additions and 29 deletions

View File

@ -24,7 +24,7 @@ use parking_lot::Mutex;
use rskafka::{
client::{
consumer::{StartOffset, StreamConsumerBuilder},
error::{Error as RSKafkaError, ProtocolError, RequestContext, ServerErrorResponse},
error::{Error as RSKafkaError, ProtocolError},
partition::{OffsetAt, PartitionClient, UnknownTopicHandling},
producer::{BatchProducer, BatchProducerBuilder},
ClientBuilder,
@ -280,34 +280,11 @@ impl WriteBufferStreamHandler for RSKafkaStreamHandler {
let kind = match e {
RSKafkaError::ServerError {
protocol_error: ProtocolError::OffsetOutOfRange,
response:
Some(ServerErrorResponse::PartitionFetchState {
high_watermark, ..
}),
// NOTE: the high watermark included in this
// response is always -1 when reading before/after
// valid offsets.
..
} if high_watermark < 0 => WriteBufferErrorKind::Unknown,
RSKafkaError::ServerError {
protocol_error: ProtocolError::OffsetOutOfRange,
request: RequestContext::Fetch { offset, .. },
response:
Some(ServerErrorResponse::PartitionFetchState {
high_watermark, ..
}),
..
} if high_watermark < offset => {
WriteBufferErrorKind::SequenceNumberAfterWatermark
}
RSKafkaError::ServerError {
protocol_error: ProtocolError::OffsetOutOfRange,
request: RequestContext::Fetch { offset, .. },
response:
Some(ServerErrorResponse::PartitionFetchState {
high_watermark, ..
}),
..
} if high_watermark >= offset => {
WriteBufferErrorKind::SequenceNumberNoLongerExists
}
} => WriteBufferErrorKind::SequenceNumberNoLongerExists,
_ => WriteBufferErrorKind::Unknown,
};
return Err(WriteBufferError::new(kind, e));
@ -349,7 +326,7 @@ impl WriteBufferStreamHandler for RSKafkaStreamHandler {
async fn seek(&mut self, sequence_number: SequenceNumber) -> Result<(), WriteBufferError> {
let offset = sequence_number.get();
let current = self.partition_client.get_offset(OffsetAt::Latest).await?;
if offset > current {
if dbg!(offset) > dbg!(current) {
return Err(WriteBufferError::sequence_number_after_watermark(format!(
"attempted to seek to offset {offset}, but current high \
watermark for partition {p} is {current}",