diff --git a/ingester/src/dml_sink/mock_sink.rs b/ingester/src/dml_sink/mock_sink.rs index 5f8c5ca0a0..f489e1f702 100644 --- a/ingester/src/dml_sink/mock_sink.rs +++ b/ingester/src/dml_sink/mock_sink.rs @@ -4,13 +4,13 @@ use async_trait::async_trait; use dml::DmlOperation; use parking_lot::Mutex; -use super::DmlSink; +use super::{DmlError, DmlSink}; use crate::data::DmlApplyAction; #[derive(Debug, Default)] struct MockDmlSinkState { calls: Vec, - ret: VecDeque>, + ret: VecDeque>, } #[derive(Debug, Default)] @@ -19,9 +19,9 @@ pub struct MockDmlSink { } impl MockDmlSink { - pub fn with_apply_return( + pub(crate) fn with_apply_return( self, - ret: impl Into>>, + ret: impl Into>>, ) -> Self { self.state.lock().ret = ret.into(); self @@ -34,7 +34,8 @@ impl MockDmlSink { #[async_trait] impl DmlSink for MockDmlSink { - async fn apply(&self, op: DmlOperation) -> Result { + type Error = DmlError; + async fn apply(&self, op: DmlOperation) -> Result { let mut state = self.state.lock(); state.calls.push(op); state.ret.pop_front().expect("no mock sink value to return") diff --git a/ingester/src/dml_sink/trait.rs b/ingester/src/dml_sink/trait.rs index 1202271c8a..964c494b8a 100644 --- a/ingester/src/dml_sink/trait.rs +++ b/ingester/src/dml_sink/trait.rs @@ -1,17 +1,27 @@ -use std::{fmt::Debug, ops::Deref, sync::Arc}; +use std::{error::Error, fmt::Debug, ops::Deref, sync::Arc}; use async_trait::async_trait; use dml::DmlOperation; +use thiserror::Error; use crate::data::DmlApplyAction; +#[derive(Debug, Error)] +pub(crate) enum DmlError { + /// An error applying a [`DmlOperation`]. + #[error(transparent)] + Data(#[from] crate::data::Error), +} + /// A [`DmlSink`] handles [`DmlOperation`] instances read from a shard. #[async_trait] pub(crate) trait DmlSink: Debug + Send + Sync { + type Error: Error + Into + Send; + /// Apply `op` read from a shard, returning `Ok(DmlApplyAction::Applied(bool))`, the bool indicating if the /// ingest should be paused. Returns `Ok(DmlApplyAction::Skipped)` if the operation has been /// applied previously and was skipped. - async fn apply(&self, op: DmlOperation) -> Result; + async fn apply(&self, op: DmlOperation) -> Result; } #[async_trait] @@ -19,7 +29,8 @@ impl DmlSink for Arc where T: DmlSink, { - async fn apply(&self, op: DmlOperation) -> Result { + type Error = T::Error; + async fn apply(&self, op: DmlOperation) -> Result { self.deref().apply(op).await } } diff --git a/ingester/src/server/grpc/rpc_write.rs b/ingester/src/server/grpc/rpc_write.rs index 26956d6464..18a8566af1 100644 --- a/ingester/src/server/grpc/rpc_write.rs +++ b/ingester/src/server/grpc/rpc_write.rs @@ -9,7 +9,10 @@ use observability_deps::tracing::*; use thiserror::Error; use tonic::{Request, Response}; -use crate::{data::DmlApplyAction, dml_sink::DmlSink}; +use crate::{ + data::DmlApplyAction, + dml_sink::{DmlError, DmlSink}, +}; // A list of error states when handling an RPC write request. // @@ -29,7 +32,7 @@ enum RpcError { Decode(mutable_batch_pb::decode::Error), #[error(transparent)] - Apply(crate::data::Error), + Apply(DmlError), } impl From for tonic::Status { @@ -40,8 +43,10 @@ impl From for tonic::Status { RpcError::Decode(_) | RpcError::NoPayload | RpcError::NoTables => { Self::invalid_argument(e.to_string()) } - RpcError::Apply(Error::BufferWrite { source }) => map_write_error(source), - RpcError::Apply(Error::ShardNotFound { .. }) => { + RpcError::Apply(DmlError::Data(Error::BufferWrite { source })) => { + map_write_error(source) + } + RpcError::Apply(DmlError::Data(Error::ShardNotFound { .. })) => { // This is not a reachable error state in the gRPC write model, // and is enumerated here purely because of error conflation // (one big error type instead of small, composable errors). @@ -159,7 +164,7 @@ where } Err(e) => { error!(error=%e, "failed to apply DML op"); - return Err(RpcError::Apply(e))?; + return Err(RpcError::Apply(e.into()))?; } } diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index ed46e42a78..58d3bdfc89 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -510,7 +510,7 @@ fn metric_attrs( mod tests { use super::*; use crate::{ - dml_sink::mock_sink::MockDmlSink, + dml_sink::{mock_sink::MockDmlSink, DmlError}, lifecycle::{LifecycleConfig, LifecycleManager}, }; use assert_matches::assert_matches; @@ -996,7 +996,7 @@ mod tests { Ok(DmlOperation::Write(make_write(2222, 2))) ]], sink_rets = [ - Err(crate::data::Error::ShardNotFound{shard_id: ShardId::new(42)}), + Err(DmlError::Data(crate::data::Error::ShardNotFound{shard_id: ShardId::new(42)})), Ok(DmlApplyAction::Applied(true)), ], want_ttbr = 2, diff --git a/ingester/src/stream_handler/sink_adaptor.rs b/ingester/src/stream_handler/sink_adaptor.rs index 30490ccf75..99c6b3bd26 100644 --- a/ingester/src/stream_handler/sink_adaptor.rs +++ b/ingester/src/stream_handler/sink_adaptor.rs @@ -38,7 +38,9 @@ impl IngestSinkAdaptor { #[async_trait] impl DmlSink for IngestSinkAdaptor { - async fn apply(&self, op: DmlOperation) -> Result { + type Error = crate::data::Error; + + async fn apply(&self, op: DmlOperation) -> Result { self.ingest_data .buffer_operation(self.shard_id, op, &self.lifecycle_handle) .await diff --git a/ingester/src/stream_handler/sink_instrumentation.rs b/ingester/src/stream_handler/sink_instrumentation.rs index b7548bf583..8405d8a9a2 100644 --- a/ingester/src/stream_handler/sink_instrumentation.rs +++ b/ingester/src/stream_handler/sink_instrumentation.rs @@ -155,7 +155,9 @@ where T: DmlSink, P: TimeProvider, { - async fn apply(&self, op: DmlOperation) -> Result { + type Error = T::Error; + + async fn apply(&self, op: DmlOperation) -> Result { let meta = op.meta(); // Immediately increment the "bytes read" metric as it records the @@ -236,7 +238,7 @@ where mod tests { use super::*; use crate::{ - dml_sink::mock_sink::MockDmlSink, + dml_sink::{mock_sink::MockDmlSink, DmlError}, stream_handler::mock_watermark_fetcher::MockWatermarkFetcher, }; use assert_matches::assert_matches; @@ -302,9 +304,9 @@ mod tests { async fn test( op: impl Into + Send, metrics: &metric::Registry, - with_sink_return: Result, + with_sink_return: Result, with_fetcher_return: Option, - ) -> Result { + ) -> Result { let op = op.into(); let inner = MockDmlSink::default().with_apply_return([with_sink_return]); let instrumentation = SinkInstrumentation::new( @@ -420,13 +422,16 @@ mod tests { let got = test( op, &metrics, - Err(crate::data::Error::ShardNotFound { + Err(DmlError::Data(crate::data::Error::ShardNotFound { shard_id: ShardId::new(42), - }), + })), Some(12345), ) .await; - assert_matches!(got, Err(crate::data::Error::ShardNotFound { .. })); + assert_matches!( + got, + Err(DmlError::Data(crate::data::Error::ShardNotFound { .. })) + ); // Validate the various write buffer metrics assert_matches!(