refactor: Move error handling logic to be all together

pull/24376/head
Carol (Nichols || Goulding) 2022-05-20 13:02:54 -04:00
parent 549dd497ea
commit bcbf7b4f46
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
1 changed files with 20 additions and 26 deletions

View File

@ -196,22 +196,6 @@ where
}
);
// If we get an unknown sequence number, and we're fine potentially having missed
// writes that were too old to be retained, try resetting the stream once and getting
// the next operation again.
if self.skip_to_oldest_available
&& !reset_to_earliest_once
&& matches!(
&maybe_op,
Some(Err(e)) if e.kind() == WriteBufferErrorKind::UnknownSequenceNumber
)
{
self.write_buffer_stream_handler.reset_to_earliest();
stream = self.write_buffer_stream_handler.stream().await;
reset_to_earliest_once = true;
continue;
}
// Read a DML op from the write buffer, logging and emitting metrics
// for any potential errors to enable alerting on potential data
// loss.
@ -219,18 +203,28 @@ where
// If this evaluation results in no viable DML op to apply to the
// DmlSink, return None rather than continuing the loop to ensure
// ingest pauses are respected.
let maybe_op = match dbg!(maybe_op) {
let maybe_op = match maybe_op {
Some(Ok(op)) => Some(op),
Some(Err(e)) if e.kind() == WriteBufferErrorKind::UnknownSequenceNumber => {
error!(
error=%e,
kafka_topic=%self.kafka_topic_name,
kafka_partition=%self.kafka_partition,
potential_data_loss=true,
"unable to read from desired sequencer offset"
);
self.seq_unknown_sequence_number_count.inc(1);
None
// If we get an unknown sequence number, and we're fine potentially having
// missed writes that were too old to be retained, try resetting the stream
// once and getting the next operation again.
if self.skip_to_oldest_available && !reset_to_earliest_once {
self.write_buffer_stream_handler.reset_to_earliest();
stream = self.write_buffer_stream_handler.stream().await;
reset_to_earliest_once = true;
continue;
} else {
error!(
error=%e,
kafka_topic=%self.kafka_topic_name,
kafka_partition=%self.kafka_partition,
potential_data_loss=true,
"unable to read from desired sequencer offset"
);
self.seq_unknown_sequence_number_count.inc(1);
None
}
}
Some(Err(e)) if e.kind() == WriteBufferErrorKind::IO => {
warn!(