From 475c8a07045daeb731c0b9ee2d7b4be55ffbcb0f Mon Sep 17 00:00:00 2001 From: Luke Bond Date: Fri, 14 Oct 2022 13:06:49 +0100 Subject: [PATCH] fix: only emit ttbr metric for applied ops (#5854) * fix: only emit ttbr metric for applied ops * fix: move DmlApplyAction to s/w accessible * chore: test for skipped ingest; comments and log improvements * fix: fixed ingester test re skipping write Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- ingester/src/data.rs | 28 ++++--- ingester/src/data/namespace.rs | 21 +++-- ingester/src/data/shard.rs | 3 +- ingester/src/data/table.rs | 16 ++-- ingester/src/stream_handler/handler.rs | 77 +++++++++++++------ ingester/src/stream_handler/mock_sink.rs | 7 +- ingester/src/stream_handler/sink.rs | 11 ++- ingester/src/stream_handler/sink_adaptor.rs | 7 +- .../stream_handler/sink_instrumentation.rs | 24 +++--- query_tests/src/scenarios/util.rs | 10 ++- 10 files changed, 133 insertions(+), 71 deletions(-) diff --git a/ingester/src/data.rs b/ingester/src/data.rs index d962fca3ba..c663bae41f 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -172,7 +172,7 @@ impl IngesterData { shard_id: ShardId, dml_operation: DmlOperation, lifecycle_handle: &dyn LifecycleHandle, - ) -> Result { + ) -> Result { let shard_data = self .shards .get(&shard_id) @@ -541,6 +541,16 @@ impl Persister for IngesterData { } } +/// A successful DML apply operation can perform one of these actions +#[derive(Clone, Copy, Debug)] +pub enum DmlApplyAction { + /// The DML operation was successful; bool indicates if ingestion should be paused + Applied(bool), + + /// The DML operation was skipped because it has already been applied + Skipped, +} + #[cfg(test)] mod tests { use std::{ops::DerefMut, sync::Arc, time::Duration}; @@ -634,7 +644,7 @@ mod tests { metrics, Arc::new(SystemProvider::new()), ); - let should_pause = data + let action = data .buffer_operation( shard1.id, DmlOperation::Write(w1.clone()), @@ -642,12 +652,12 @@ mod tests { ) .await .unwrap(); - assert!(!should_pause); - let should_pause = data + assert_matches!(action, DmlApplyAction::Applied(false)); + let action = data .buffer_operation(shard1.id, DmlOperation::Write(w1), &manager.handle()) .await .unwrap(); - assert!(should_pause); + assert_matches!(action, DmlApplyAction::Applied(true)); } #[tokio::test] @@ -715,13 +725,13 @@ mod tests { Arc::new(SystemProvider::new()), ); - let should_pause = data + let action = data .buffer_operation(shard1.id, DmlOperation::Write(w1), &manager.handle()) .await .unwrap(); // Exceeding the row count doesn't pause ingest (like other partition // limits) - assert!(!should_pause); + assert_matches!(action, DmlApplyAction::Applied(false)); let (table_id, partition_id) = { let sd = data.shards.get(&shard1.id).unwrap(); @@ -1291,7 +1301,7 @@ mod tests { // w1 should be ignored because the per-partition replay offset is set // to 1 already, so it shouldn't be buffered and the buffer should // remain empty. - let should_pause = data + let action = data .buffer_operation(DmlOperation::Write(w1), &catalog, &manager.handle()) .await .unwrap(); @@ -1305,7 +1315,7 @@ mod tests { ); assert!(p.data.buffer.is_none()); } - assert!(!should_pause); + assert_matches!(action, DmlApplyAction::Skipped); // w2 should be in the buffer data.buffer_operation(DmlOperation::Write(w2), &catalog, &manager.handle()) diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index 8ccf6e3ded..500345dcf3 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -17,7 +17,7 @@ use super::{ partition::resolver::PartitionProvider, table::{TableData, TableName}, }; -use crate::lifecycle::LifecycleHandle; +use crate::{data::DmlApplyAction, lifecycle::LifecycleHandle}; /// A double-referenced map where [`TableData`] can be looked up by name, or ID. #[derive(Debug, Default)] @@ -177,7 +177,7 @@ impl NamespaceData { dml_operation: DmlOperation, catalog: &Arc, lifecycle_handle: &dyn LifecycleHandle, - ) -> Result { + ) -> Result { let sequence_number = dml_operation .meta() .sequence() @@ -194,6 +194,7 @@ impl NamespaceData { match dml_operation { DmlOperation::Write(write) => { let mut pause_writes = false; + let mut all_skipped = true; // Extract the partition key derived by the router. let partition_key = write @@ -211,7 +212,7 @@ impl NamespaceData { { // lock scope let mut table_data = table_data.write().await; - let should_pause = table_data + let action = table_data .buffer_table_write( sequence_number, b, @@ -219,13 +220,21 @@ impl NamespaceData { lifecycle_handle, ) .await?; - pause_writes = pause_writes || should_pause; + if let DmlApplyAction::Applied(should_pause) = action { + pause_writes = pause_writes || should_pause; + all_skipped = false; + } } #[cfg(test)] self.test_triggers.on_write().await; } - Ok(pause_writes) + if all_skipped { + Ok(DmlApplyAction::Skipped) + } else { + // at least some were applied + Ok(DmlApplyAction::Applied(pause_writes)) + } } DmlOperation::Delete(delete) => { // Deprecated delete support: @@ -239,7 +248,7 @@ impl NamespaceData { "discarding unsupported delete op" ); - Ok(false) + Ok(DmlApplyAction::Applied(false)) } } } diff --git a/ingester/src/data/shard.rs b/ingester/src/data/shard.rs index b01504085f..5b57fa9e27 100644 --- a/ingester/src/data/shard.rs +++ b/ingester/src/data/shard.rs @@ -13,6 +13,7 @@ use write_summary::ShardProgress; use super::{ namespace::{NamespaceData, NamespaceName}, partition::resolver::PartitionProvider, + DmlApplyAction, }; use crate::lifecycle::LifecycleHandle; @@ -99,7 +100,7 @@ impl ShardData { dml_operation: DmlOperation, catalog: &Arc, lifecycle_handle: &dyn LifecycleHandle, - ) -> Result { + ) -> Result { let namespace_data = match self.namespace(&NamespaceName::from(dml_operation.namespace())) { Some(d) => d, None => { diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs index 472809a783..3e0fd0d6c4 100644 --- a/ingester/src/data/table.rs +++ b/ingester/src/data/table.rs @@ -8,7 +8,7 @@ use observability_deps::tracing::*; use write_summary::ShardProgress; use super::partition::{resolver::PartitionProvider, PartitionData, UnpersistedPartitionData}; -use crate::{lifecycle::LifecycleHandle, querier_handler::PartitionStatus}; +use crate::{data::DmlApplyAction, lifecycle::LifecycleHandle, querier_handler::PartitionStatus}; /// A double-referenced map where [`PartitionData`] can be looked up by /// [`PartitionKey`], or ID. @@ -137,7 +137,7 @@ impl TableData { batch: MutableBatch, partition_key: PartitionKey, lifecycle_handle: &dyn LifecycleHandle, - ) -> Result { + ) -> Result { let partition_data = match self.partition_data.by_key.get_mut(&partition_key) { Some(p) => p, None => { @@ -165,7 +165,7 @@ impl TableData { op_sequence_number=?sequence_number, "skipping already-persisted write" ); - return Ok(false); + return Ok(DmlApplyAction::Skipped); } } @@ -188,7 +188,7 @@ impl TableData { rows, ); - Ok(should_pause) + Ok(DmlApplyAction::Applied(should_pause)) } /// Return the [`PartitionData`] for the specified ID. @@ -332,7 +332,7 @@ mod tests { assert!(table.partition_data.by_id_mut(PARTITION_ID).is_none()); // Write some test data - let pause = table + let action = table .buffer_table_write( SequenceNumber::new(42), batch, @@ -341,7 +341,7 @@ mod tests { ) .await .expect("buffer op should succeed"); - assert!(!pause); + assert_matches!(action, DmlApplyAction::Applied(false)); // Referencing the partition should succeed assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_some()); @@ -394,7 +394,7 @@ mod tests { assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_none()); // Write some test data - let pause = table + let action = table .buffer_table_write( SequenceNumber::new(42), batch, @@ -403,7 +403,7 @@ mod tests { ) .await .expect("buffer op should succeed"); - assert!(!pause); + assert_matches!(action, DmlApplyAction::Applied(false)); // Referencing the partition should succeed assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_some()); diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index 68a49d3ff8..f1d31b08d5 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -12,7 +12,10 @@ use tokio_util::sync::CancellationToken; use write_buffer::core::{WriteBufferErrorKind, WriteBufferStreamHandler}; use super::DmlSink; -use crate::lifecycle::{LifecycleHandle, LifecycleHandleImpl}; +use crate::{ + data::DmlApplyAction, + lifecycle::{LifecycleHandle, LifecycleHandleImpl}, +}; /// When the [`LifecycleManager`] indicates that ingest should be paused because /// of memory pressure, the shard will loop, sleeping this long between @@ -384,7 +387,7 @@ something clever.", op.meta().duration_since_production(&self.time_provider); let should_pause = match self.sink.apply(op).await { - Ok(should_pause) => { + Ok(DmlApplyAction::Applied(should_pause)) => { trace!( kafka_topic=%self.topic_name, shard_index=%self.shard_index, @@ -393,8 +396,31 @@ something clever.", ?op_sequence_number, "successfully applied dml operation" ); + // we only want to report the TTBR if anything was applied + if let Some(delta) = duration_since_production { + // Update the TTBR metric before potentially sleeping. + self.time_to_be_readable.set(delta); + trace!( + kafka_topic=%self.topic_name, + shard_index=%self.shard_index, + shard_id=%self.shard_id, + delta=%delta.as_millis(), + "reporting TTBR for shard (ms)" + ); + } should_pause } + Ok(DmlApplyAction::Skipped) => { + trace!( + kafka_topic=%self.topic_name, + shard_index=%self.shard_index, + shard_id=%self.shard_id, + false, + ?op_sequence_number, + "did not apply dml operation (op was already persisted previously)" + ); + false + } Err(e) => { error!( error=%e, @@ -410,18 +436,6 @@ something clever.", } }; - if let Some(delta) = duration_since_production { - // Update the TTBR metric before potentially sleeping. - self.time_to_be_readable.set(delta); - trace!( - kafka_topic=%self.topic_name, - shard_index=%self.shard_index, - shard_id=%self.shard_id, - delta=%delta.as_millis(), - "reporting TTBR for shard (ms)" - ); - } - if should_pause { // The lifecycle manager may temporarily pause ingest - wait for // persist operations to shed memory pressure if needed. @@ -772,7 +786,7 @@ mod tests { stream_ops = vec![ vec![Ok(DmlOperation::Write(make_write("bananas", 42)))] ], - sink_rets = [Ok(true)], + sink_rets = [Ok(DmlApplyAction::Applied(true))], want_ttbr = 42, want_reset = 0, want_err_metrics = [], @@ -788,7 +802,7 @@ mod tests { stream_ops = vec![ vec![Ok(DmlOperation::Delete(make_delete("platanos", 24)))] ], - sink_rets = [Ok(true)], + sink_rets = [Ok(DmlApplyAction::Applied(true))], want_ttbr = 24, want_reset = 0, want_err_metrics = [], @@ -806,7 +820,7 @@ mod tests { Err(WriteBufferError::new(WriteBufferErrorKind::IO, "explosions")), Ok(DmlOperation::Write(make_write("bananas", 13))) ]], - sink_rets = [Ok(true)], + sink_rets = [Ok(DmlApplyAction::Applied(true))], want_ttbr = 13, want_reset = 0, want_err_metrics = [ @@ -829,7 +843,7 @@ mod tests { Err(WriteBufferError::new(WriteBufferErrorKind::SequenceNumberNoLongerExists, "explosions")), Ok(DmlOperation::Write(make_write("bananas", 31))) ]], - sink_rets = [Ok(true)], + sink_rets = [Ok(DmlApplyAction::Applied(true))], want_ttbr = 31, want_reset = 0, want_err_metrics = [ @@ -858,7 +872,7 @@ mod tests { ], vec![Ok(DmlOperation::Write(make_write("bananas", 31)))], ], - sink_rets = [Ok(true)], + sink_rets = [Ok(DmlApplyAction::Applied(true))], want_ttbr = 31, want_reset = 1, want_err_metrics = [ @@ -880,7 +894,7 @@ mod tests { Err(WriteBufferError::new(WriteBufferErrorKind::InvalidData, "explosions")), Ok(DmlOperation::Write(make_write("bananas", 50))) ]], - sink_rets = [Ok(true)], + sink_rets = [Ok(DmlApplyAction::Applied(true))], want_ttbr = 50, want_reset = 0, want_err_metrics = [ @@ -902,7 +916,7 @@ mod tests { Err(WriteBufferError::new(WriteBufferErrorKind::Unknown, "explosions")), Ok(DmlOperation::Write(make_write("bananas", 60))) ]], - sink_rets = [Ok(true)], + sink_rets = [Ok(DmlApplyAction::Applied(true))], want_ttbr = 60, want_reset = 0, want_err_metrics = [ @@ -932,7 +946,7 @@ mod tests { want_sink = [] ); - // Asserts the TTBR is uses the last value in the stream. + // Asserts the TTBR used is the last value in the stream. test_stream_handler!( reports_last_ttbr, skip_to_oldest_available = false, @@ -942,7 +956,7 @@ mod tests { Ok(DmlOperation::Write(make_write("bananas", 3))), Ok(DmlOperation::Write(make_write("bananas", 42))), ]], - sink_rets = [Ok(true), Ok(false), Ok(true), Ok(false),], + sink_rets = [Ok(DmlApplyAction::Applied(true)), Ok(DmlApplyAction::Applied(false)), Ok(DmlApplyAction::Applied(true)), Ok(DmlApplyAction::Applied(false)),], want_ttbr = 42, want_reset = 0, want_err_metrics = [ @@ -967,7 +981,7 @@ mod tests { ]], sink_rets = [ Err(crate::data::Error::NamespaceNotFound{namespace: "bananas".to_string() }), - Ok(true), + Ok(DmlApplyAction::Applied(true)), ], want_ttbr = 2, want_reset = 0, @@ -986,6 +1000,21 @@ mod tests { } ); + test_stream_handler!( + skipped_op_no_ttbr, + skip_to_oldest_available = false, + stream_ops = vec![vec![Ok(DmlOperation::Write(make_write("some_op", 1)))]], + sink_rets = [Ok(DmlApplyAction::Skipped)], + want_ttbr = 0, + want_reset = 0, + want_err_metrics = [], + want_sink = [ + DmlOperation::Write(op), + ] => { + assert_eq!(op.namespace(), "some_op"); + } + ); + #[derive(Debug)] struct EmptyWriteBufferStreamHandler {} diff --git a/ingester/src/stream_handler/mock_sink.rs b/ingester/src/stream_handler/mock_sink.rs index 974e7e57aa..5f8c5ca0a0 100644 --- a/ingester/src/stream_handler/mock_sink.rs +++ b/ingester/src/stream_handler/mock_sink.rs @@ -5,11 +5,12 @@ use dml::DmlOperation; use parking_lot::Mutex; use super::DmlSink; +use crate::data::DmlApplyAction; #[derive(Debug, Default)] struct MockDmlSinkState { calls: Vec, - ret: VecDeque>, + ret: VecDeque>, } #[derive(Debug, Default)] @@ -20,7 +21,7 @@ pub struct MockDmlSink { impl MockDmlSink { pub fn with_apply_return( self, - ret: impl Into>>, + ret: impl Into>>, ) -> Self { self.state.lock().ret = ret.into(); self @@ -33,7 +34,7 @@ impl MockDmlSink { #[async_trait] impl DmlSink for MockDmlSink { - async fn apply(&self, op: DmlOperation) -> Result { + async fn apply(&self, op: DmlOperation) -> Result { let mut state = self.state.lock(); state.calls.push(op); state.ret.pop_front().expect("no mock sink value to return") diff --git a/ingester/src/stream_handler/sink.rs b/ingester/src/stream_handler/sink.rs index 825b012ce9..1202271c8a 100644 --- a/ingester/src/stream_handler/sink.rs +++ b/ingester/src/stream_handler/sink.rs @@ -3,12 +3,15 @@ use std::{fmt::Debug, ops::Deref, sync::Arc}; use async_trait::async_trait; use dml::DmlOperation; +use crate::data::DmlApplyAction; + /// A [`DmlSink`] handles [`DmlOperation`] instances read from a shard. #[async_trait] pub(crate) trait DmlSink: Debug + Send + Sync { - /// Apply `op` read from a shard, returning `Ok(true)` if ingest should - /// be paused. - async fn apply(&self, op: DmlOperation) -> Result; + /// Apply `op` read from a shard, returning `Ok(DmlApplyAction::Applied(bool))`, the bool indicating if the + /// ingest should be paused. Returns `Ok(DmlApplyAction::Skipped)` if the operation has been + /// applied previously and was skipped. + async fn apply(&self, op: DmlOperation) -> Result; } #[async_trait] @@ -16,7 +19,7 @@ impl DmlSink for Arc where T: DmlSink, { - async fn apply(&self, op: DmlOperation) -> Result { + async fn apply(&self, op: DmlOperation) -> Result { self.deref().apply(op).await } } diff --git a/ingester/src/stream_handler/sink_adaptor.rs b/ingester/src/stream_handler/sink_adaptor.rs index 4f885a496c..3780f3da5b 100644 --- a/ingester/src/stream_handler/sink_adaptor.rs +++ b/ingester/src/stream_handler/sink_adaptor.rs @@ -7,7 +7,10 @@ use data_types::ShardId; use dml::DmlOperation; use super::DmlSink; -use crate::{data::IngesterData, lifecycle::LifecycleHandleImpl}; +use crate::{ + data::{DmlApplyAction, IngesterData}, + lifecycle::LifecycleHandleImpl, +}; /// Provides a [`DmlSink`] implementation for a [`IngesterData`] instance. #[derive(Debug)] @@ -35,7 +38,7 @@ impl IngestSinkAdaptor { #[async_trait] impl DmlSink for IngestSinkAdaptor { - async fn apply(&self, op: DmlOperation) -> Result { + async fn apply(&self, op: DmlOperation) -> Result { self.ingest_data .buffer_operation(self.shard_id, op, &self.lifecycle_handle) .await diff --git a/ingester/src/stream_handler/sink_instrumentation.rs b/ingester/src/stream_handler/sink_instrumentation.rs index 998e14bb48..98292ebc7a 100644 --- a/ingester/src/stream_handler/sink_instrumentation.rs +++ b/ingester/src/stream_handler/sink_instrumentation.rs @@ -9,6 +9,8 @@ use iox_time::{SystemProvider, TimeProvider}; use metric::{Attributes, DurationHistogram, U64Counter, U64Gauge}; use trace::span::{SpanExt, SpanRecorder}; +use crate::data::DmlApplyAction; + use super::DmlSink; /// A [`WatermarkFetcher`] abstracts a source of the write buffer high watermark @@ -155,7 +157,7 @@ where T: DmlSink, P: TimeProvider, { - async fn apply(&self, op: DmlOperation) -> Result { + async fn apply(&self, op: DmlOperation) -> Result { let meta = op.meta(); // Immediately increment the "bytes read" metric as it records the @@ -292,9 +294,9 @@ mod tests { async fn test( op: impl Into + Send, metrics: &metric::Registry, - with_sink_return: Result, + with_sink_return: Result, with_fetcher_return: Option, - ) -> Result { + ) -> Result { let op = op.into(); let inner = MockDmlSink::default().with_apply_return([with_sink_return]); let instrumentation = SinkInstrumentation::new( @@ -342,8 +344,8 @@ mod tests { ); let op = make_write(meta); - let got = test(op, &metrics, Ok(true), Some(12345)).await; - assert_matches!(got, Ok(true)); + let got = test(op, &metrics, Ok(DmlApplyAction::Applied(true)), Some(12345)).await; + assert_matches!(got, Ok(DmlApplyAction::Applied(true))); // Validate the various write buffer metrics assert_matches!( @@ -487,8 +489,8 @@ mod tests { ); let op = make_write(meta); - let got = test(op, &metrics, Ok(true), None).await; - assert_matches!(got, Ok(true)); + let got = test(op, &metrics, Ok(DmlApplyAction::Applied(true)), None).await; + assert_matches!(got, Ok(DmlApplyAction::Applied(true))); // Validate the various write buffer metrics assert_matches!( @@ -556,8 +558,8 @@ mod tests { ); let op = make_write(meta); - let got = test(op, &metrics, Ok(true), Some(1)).await; - assert_matches!(got, Ok(true)); + let got = test(op, &metrics, Ok(DmlApplyAction::Applied(true)), Some(1)).await; + assert_matches!(got, Ok(DmlApplyAction::Applied(true))); // Validate the various write buffer metrics assert_matches!( @@ -617,7 +619,7 @@ mod tests { let meta = DmlMeta::unsequenced(None); let op = make_write(meta); - let _ = test(op, &metrics, Ok(true), Some(12345)).await; + let _ = test(op, &metrics, Ok(DmlApplyAction::Applied(true)), Some(12345)).await; } // The instrumentation emits per-shard metrics, so upon observing an op @@ -639,6 +641,6 @@ mod tests { ); let op = make_write(meta); - let _ = test(op, &metrics, Ok(true), Some(12345)).await; + let _ = test(op, &metrics, Ok(DmlApplyAction::Applied(true)), Some(12345)).await; } } diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 477503504b..14ef4ea294 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -14,7 +14,9 @@ use generated_types::{ }; use influxdb_iox_client::flight::{low_level::LowLevelMessage, Error as FlightError}; use ingester::{ - data::{partition::resolver::CatalogPartitionResolver, IngesterData, Persister}, + data::{ + partition::resolver::CatalogPartitionResolver, DmlApplyAction, IngesterData, Persister, + }, lifecycle::mock_handle::MockLifecycleHandle, querier_handler::{prepare_data_to_querier, FlatIngesterQueryResponse, IngesterQueryResponse}, }; @@ -721,12 +723,14 @@ impl MockIngester { async fn buffer_operation(&mut self, dml_operation: DmlOperation) { let lifecycle_handle = MockLifecycleHandle::default(); - let should_pause = self + let action = self .ingester_data .buffer_operation(self.shard.shard.id, dml_operation, &lifecycle_handle) .await .unwrap(); - assert!(!should_pause); + if let DmlApplyAction::Applied(should_pause) = action { + assert!(!should_pause); + } } /// Persists the given set of partitions.