//! A handler of streamed ops from a write buffer. use std::{fmt::Debug, time::Duration}; use data_types::{SequenceNumber, ShardId, ShardIndex}; use dml::DmlOperation; use futures::{pin_mut, FutureExt, StreamExt}; use iox_time::{SystemProvider, TimeProvider}; use metric::{Attributes, DurationCounter, DurationHistogram, U64Counter}; use observability_deps::tracing::*; use tokio_util::sync::CancellationToken; use write_buffer::core::{WriteBufferErrorKind, WriteBufferStreamHandler}; use super::DmlSink; use crate::{ data::DmlApplyAction, lifecycle::{LifecycleHandle, LifecycleHandleImpl}, }; /// When the [`LifecycleManager`] indicates that ingest should be paused because /// of memory pressure, the shard will loop, sleeping this long between /// calls to [`LifecycleHandle::can_resume_ingest()`] with the manager if it /// can resume ingest. /// /// [`LifecycleManager`]: crate::lifecycle::LifecycleManager /// [`LifecycleHandle::can_resume_ingest()`]: crate::lifecycle::LifecycleHandle::can_resume_ingest() const INGEST_POLL_INTERVAL: Duration = Duration::from_millis(100); /// A [`SequencedStreamHandler`] consumes a sequence of [`DmlOperation`] from a /// shard stream and pushes them into the configured [`DmlSink`]. /// /// Ingest reads are rate limited by the [`LifecycleManager`] it is initialised /// by, pausing until the [`LifecycleHandle::can_resume_ingest()`] obtained from /// it returns true, and TTBR / error metrics are emitted on a per-shard /// basis. /// /// [`LifecycleManager`]: crate::lifecycle::LifecycleManager /// [`LifecycleHandle::can_resume_ingest()`]: crate::lifecycle::LifecycleHandle::can_resume_ingest() #[derive(Debug)] pub(crate) struct SequencedStreamHandler { /// Creator/manager of the stream of DML ops write_buffer_stream_handler: I, current_sequence_number: SequenceNumber, /// An output sink that processes DML operations and applies them to /// in-memory state. sink: O, /// A handle to the [`LifecycleManager`] singleton that may periodically /// request ingest be paused to control memory pressure. /// /// [`LifecycleManager`]: crate::lifecycle::LifecycleManager lifecycle_handle: LifecycleHandleImpl, // Metrics time_provider: T, time_to_be_readable: DurationHistogram, /// Duration of time ingest is paused at the request of the LifecycleManager pause_duration: DurationCounter, /// Errors during op stream reading shard_unknown_sequence_number_count: U64Counter, shard_invalid_data_count: U64Counter, shard_unknown_error_count: U64Counter, sink_apply_error_count: U64Counter, skipped_sequence_number_amount: U64Counter, /// Reset count shard_reset_count: U64Counter, /// Log context fields - otherwise unused. topic_name: String, shard_index: ShardIndex, shard_id: ShardId, skip_to_oldest_available: bool, } impl SequencedStreamHandler { /// Initialise a new [`SequencedStreamHandler`], consuming from `stream` and /// dispatching successfully decoded [`DmlOperation`] instances to `sink`. /// /// A [`SequencedStreamHandler`] starts actively consuming items from /// `stream` once [`SequencedStreamHandler::run()`] is called, and /// gracefully stops when `shutdown` is cancelled. #[allow(clippy::too_many_arguments)] pub(crate) fn new( write_buffer_stream_handler: I, current_sequence_number: SequenceNumber, sink: O, lifecycle_handle: LifecycleHandleImpl, topic_name: String, shard_index: ShardIndex, shard_id: ShardId, metrics: &metric::Registry, skip_to_oldest_available: bool, ) -> Self { // TTBR let time_to_be_readable = metrics .register_metric::( "ingester_ttbr", "distribution of duration between producer writing \ to consumer putting into queryable cache", ) .recorder(metric_attrs(shard_index, &topic_name, None, false)); // Lifecycle-driven ingest pause duration let pause_duration = metrics .register_metric::( "ingester_paused_duration_total", "duration of time ingestion has been paused by the lifecycle manager", ) .recorder(&[]); // Error count metrics let ingest_errors = metrics.register_metric::( "ingester_stream_handler_error", "ingester op fetching and buffering errors", ); let shard_unknown_sequence_number_count = ingest_errors.recorder(metric_attrs( shard_index, &topic_name, Some("shard_unknown_sequence_number"), true, )); let shard_invalid_data_count = ingest_errors.recorder(metric_attrs( shard_index, &topic_name, Some("shard_invalid_data"), true, )); let shard_unknown_error_count = ingest_errors.recorder(metric_attrs( shard_index, &topic_name, Some("shard_unknown_error"), true, )); let sink_apply_error_count = ingest_errors.recorder(metric_attrs( shard_index, &topic_name, Some("sink_apply_error"), true, )); let skipped_sequence_number_amount = ingest_errors.recorder(metric_attrs( shard_index, &topic_name, Some("skipped_sequence_number_amount"), true, )); // reset count let shard_reset_count = metrics .register_metric::( "shard_reset_count", "how often a shard was already reset", ) .recorder(metric_attrs(shard_index, &topic_name, None, true)); Self { write_buffer_stream_handler, current_sequence_number, sink, lifecycle_handle, time_provider: SystemProvider::default(), time_to_be_readable, pause_duration, shard_unknown_sequence_number_count, shard_invalid_data_count, shard_unknown_error_count, sink_apply_error_count, skipped_sequence_number_amount, shard_reset_count, topic_name, shard_index, shard_id, skip_to_oldest_available, } } /// Switch to the specified [`TimeProvider`] implementation. #[cfg(test)] pub(crate) fn with_time_provider(self, provider: T) -> SequencedStreamHandler { SequencedStreamHandler { write_buffer_stream_handler: self.write_buffer_stream_handler, current_sequence_number: self.current_sequence_number, sink: self.sink, lifecycle_handle: self.lifecycle_handle, time_provider: provider, time_to_be_readable: self.time_to_be_readable, pause_duration: self.pause_duration, shard_unknown_sequence_number_count: self.shard_unknown_sequence_number_count, shard_invalid_data_count: self.shard_invalid_data_count, shard_unknown_error_count: self.shard_unknown_error_count, sink_apply_error_count: self.sink_apply_error_count, skipped_sequence_number_amount: self.skipped_sequence_number_amount, shard_reset_count: self.shard_reset_count, topic_name: self.topic_name, shard_index: self.shard_index, shard_id: self.shard_id, skip_to_oldest_available: self.skip_to_oldest_available, } } } impl SequencedStreamHandler where I: WriteBufferStreamHandler, O: DmlSink, T: TimeProvider, { /// Run the stream handler, consuming items from the stream provided by the /// [`WriteBufferStreamHandler`] and applying them to the [`DmlSink`]. /// /// This method blocks until gracefully shutdown by cancelling the /// `shutdown` [`CancellationToken`]. Once cancelled, this handler will /// complete the current operation it is processing before this method /// returns. /// /// # Panics /// /// This method panics if the input stream ends (yields a `None`). pub async fn run(mut self, shutdown: CancellationToken) { let shutdown_fut = shutdown.cancelled().fuse(); pin_mut!(shutdown_fut); let mut stream = self.write_buffer_stream_handler.stream().await; let mut sequence_number_before_reset: Option = None; loop { // Wait for a DML operation from the shard, or a graceful stop signal. let maybe_op = futures::select!( next = stream.next().fuse() => next, _ = shutdown_fut => { info!( kafka_topic=%self.topic_name, shard_index=%self.shard_index, shard_id=%self.shard_id, "stream handler shutdown", ); return; } ); // Read a DML op from the write buffer, logging and emitting metrics // for any potential errors to enable alerting on potential data // loss. // // If this evaluation results in no viable DML op to apply to the // DmlSink, return None rather than continuing the loop to ensure // ingest pauses are respected. let maybe_op = match maybe_op { Some(Ok(op)) => { if let Some(sequence_number) = op.meta().sequence().map(|s| s.sequence_number) { if let Some(before_reset) = sequence_number_before_reset { // We've requested the stream to be reset and we've skipped this many // sequence numbers. Store in a metric once. if before_reset != sequence_number { let difference = sequence_number.get() - before_reset.get(); self.skipped_sequence_number_amount.inc(difference as u64); } sequence_number_before_reset = None; } self.current_sequence_number = sequence_number; } Some(op) } Some(Err(e)) if e.kind() == WriteBufferErrorKind::SequenceNumberNoLongerExists => { // If we get an unknown sequence number, and we're fine potentially having // missed writes that were too old to be retained, try resetting the stream // once and getting the next operation again. // Keep the current sequence number to compare with the sequence number if self.skip_to_oldest_available && sequence_number_before_reset.is_none() { warn!( error=%e, kafka_topic=%self.topic_name, shard_index=%self.shard_index, shard_id=%self.shard_id, potential_data_loss=true, "unable to read from desired sequence number offset \ - reset stream to oldest available data" ); self.shard_reset_count.inc(1); sequence_number_before_reset = Some(self.current_sequence_number); self.write_buffer_stream_handler.reset_to_earliest(); stream = self.write_buffer_stream_handler.stream().await; continue; } else { error!( error=%e, kafka_topic=%self.topic_name, shard_index=%self.shard_index, shard_id=%self.shard_id, potential_data_loss=true, "unable to read from desired sequence number offset \ - aborting ingest due to configuration" ); self.shard_unknown_sequence_number_count.inc(1); None } } Some(Err(e)) if e.kind() == WriteBufferErrorKind::IO => { warn!( error=%e, kafka_topic=%self.topic_name, shard_index=%self.shard_index, shard_id=%self.shard_id, "I/O error reading from shard" ); tokio::time::sleep(Duration::from_secs(1)).await; None } Some(Err(e)) if e.kind() == WriteBufferErrorKind::InvalidData => { // The DmlOperation could not be de-serialized from the // shard message. // // This is almost certainly data loss as the write will not // be applied/persisted. error!( error=%e, kafka_topic=%self.topic_name, shard_index=%self.shard_index, shard_id=%self.shard_id, potential_data_loss=true, "unable to deserialize dml operation" ); self.shard_invalid_data_count.inc(1); None } Some(Err(e)) if e.kind() == WriteBufferErrorKind::SequenceNumberAfterWatermark => { panic!( "\ Shard Index {:?} stream for topic {} has a high watermark BEFORE the sequence number we want. This \ is either a bug (see https://github.com/influxdata/rskafka/issues/147 for example) or means that \ someone re-created the shard and data is lost. In both cases, it's better to panic than to try \ something clever.", self.shard_index, self.topic_name, ) } Some(Err(e)) => { error!( error=%e, kafka_topic=%self.topic_name, shard_index=%self.shard_index, shard_id=%self.shard_id, potential_data_loss=true, "unhandled error converting write buffer data to DmlOperation", ); self.shard_unknown_error_count.inc(1); tokio::time::sleep(Duration::from_secs(1)).await; None } None => { panic!( "shard index {:?} stream for topic {} ended without graceful shutdown", self.shard_index, self.topic_name ); } }; // If a DML operation was successfully decoded, push it into the // DmlSink. self.maybe_apply_op(maybe_op).await; } } async fn maybe_apply_op(&mut self, op: Option) { if let Some(op) = op { let op_sequence_number = op.meta().sequence().map(|s| s.sequence_number); // Emit per-op debug info. trace!( kafka_topic=%self.topic_name, shard_index=%self.shard_index, shard_id=%self.shard_id, op_size=op.size(), op_namespace=op.namespace(), ?op_sequence_number, "decoded dml operation" ); // Calculate how long it has been since production by // checking the producer timestamp (added in the router // when dispatching the request). let duration_since_production = op.meta().duration_since_production(&self.time_provider); let should_pause = match self.sink.apply(op).await { Ok(DmlApplyAction::Applied(should_pause)) => { trace!( kafka_topic=%self.topic_name, shard_index=%self.shard_index, shard_id=%self.shard_id, %should_pause, ?op_sequence_number, "successfully applied dml operation" ); // we only want to report the TTBR if anything was applied if let Some(delta) = duration_since_production { // Update the TTBR metric before potentially sleeping. self.time_to_be_readable.record(delta); trace!( kafka_topic=%self.topic_name, shard_index=%self.shard_index, shard_id=%self.shard_id, delta=%delta.as_millis(), "reporting TTBR for shard (ms)" ); } should_pause } Ok(DmlApplyAction::Skipped) => { trace!( kafka_topic=%self.topic_name, shard_index=%self.shard_index, shard_id=%self.shard_id, false, ?op_sequence_number, "did not apply dml operation (op was already persisted previously)" ); false } Err(e) => { error!( error=%e, kafka_topic=%self.topic_name, shard_index=%self.shard_index, shard_id=%self.shard_id, ?op_sequence_number, potential_data_loss=true, "failed to apply dml operation" ); self.sink_apply_error_count.inc(1); return; } }; if should_pause { // The lifecycle manager may temporarily pause ingest - wait for // persist operations to shed memory pressure if needed. self.pause_ingest().await; } } } async fn pause_ingest(&mut self) { // Record how long this pause is, for logging purposes. let started_at = self.time_provider.now(); warn!( kafka_topic=%self.topic_name, shard_index=%self.shard_index, shard_id=%self.shard_id, "pausing ingest until persistence has run" ); while !self.lifecycle_handle.can_resume_ingest() { // Incrementally report on the sleeps (as opposed to // measuring the start/end duration) in order to report // a blocked ingester _before_ it recovers. // // While the actual sleep may be slightly longer than // INGEST_POLL_INTERVAL, it's not likely to be a useful // distinction in the metrics. self.pause_duration.inc(INGEST_POLL_INTERVAL); tokio::time::sleep(INGEST_POLL_INTERVAL).await; } let duration_str = self .time_provider .now() .checked_duration_since(started_at) .map(|v| format!("{}ms", v.as_millis())) .unwrap_or_else(|| "unknown".to_string()); info!( kafka_topic=%self.topic_name, shard_index=%self.shard_index, shard_id=%self.shard_id, pause_duration=%duration_str, "resuming ingest" ); } } fn metric_attrs( shard_index: ShardIndex, topic: &str, err: Option<&'static str>, data_loss: bool, ) -> Attributes { let mut attr = Attributes::from([ ("kafka_partition", shard_index.to_string().into()), ("kafka_topic", topic.to_string().into()), ]); if let Some(err) = err { attr.insert("error", err) } if data_loss { attr.insert("potential_data_loss", "true"); } attr } #[cfg(test)] mod tests { use std::sync::Arc; use assert_matches::assert_matches; use async_trait::async_trait; use data_types::{DeletePredicate, Sequence, TimestampRange}; use dml::{DmlDelete, DmlMeta, DmlWrite}; use futures::stream::{self, BoxStream}; use iox_time::{SystemProvider, Time}; use metric::{HistogramObservation, Metric}; use mutable_batch_lp::lines_to_batches; use once_cell::sync::Lazy; use test_helpers::timeout::FutureTimeout; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::ReceiverStream; use write_buffer::core::WriteBufferError; use super::*; use crate::{ lifecycle::{LifecycleConfig, LifecycleManager}, stream_handler::mock_sink::MockDmlSink, }; static TEST_TIME: Lazy