From bcbf7b4f4616ab463792a3a2d2765749b73fa9f9 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 20 May 2022 13:02:54 -0400 Subject: [PATCH] refactor: Move error handling logic to be all together --- ingester/src/stream_handler/handler.rs | 46 +++++++++++--------------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index ab3ab57c9f..3f914d64d1 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -196,22 +196,6 @@ where } ); - // If we get an unknown sequence number, and we're fine potentially having missed - // writes that were too old to be retained, try resetting the stream once and getting - // the next operation again. - if self.skip_to_oldest_available - && !reset_to_earliest_once - && matches!( - &maybe_op, - Some(Err(e)) if e.kind() == WriteBufferErrorKind::UnknownSequenceNumber - ) - { - self.write_buffer_stream_handler.reset_to_earliest(); - stream = self.write_buffer_stream_handler.stream().await; - reset_to_earliest_once = true; - continue; - } - // Read a DML op from the write buffer, logging and emitting metrics // for any potential errors to enable alerting on potential data // loss. @@ -219,18 +203,28 @@ where // If this evaluation results in no viable DML op to apply to the // DmlSink, return None rather than continuing the loop to ensure // ingest pauses are respected. - let maybe_op = match dbg!(maybe_op) { + let maybe_op = match maybe_op { Some(Ok(op)) => Some(op), Some(Err(e)) if e.kind() == WriteBufferErrorKind::UnknownSequenceNumber => { - error!( - error=%e, - kafka_topic=%self.kafka_topic_name, - kafka_partition=%self.kafka_partition, - potential_data_loss=true, - "unable to read from desired sequencer offset" - ); - self.seq_unknown_sequence_number_count.inc(1); - None + // If we get an unknown sequence number, and we're fine potentially having + // missed writes that were too old to be retained, try resetting the stream + // once and getting the next operation again. + if self.skip_to_oldest_available && !reset_to_earliest_once { + self.write_buffer_stream_handler.reset_to_earliest(); + stream = self.write_buffer_stream_handler.stream().await; + reset_to_earliest_once = true; + continue; + } else { + error!( + error=%e, + kafka_topic=%self.kafka_topic_name, + kafka_partition=%self.kafka_partition, + potential_data_loss=true, + "unable to read from desired sequencer offset" + ); + self.seq_unknown_sequence_number_count.inc(1); + None + } } Some(Err(e)) if e.kind() == WriteBufferErrorKind::IO => { warn!(