Merge pull request #6189 from influxdata/dom/dml-sink-error

refactor: decouple DmlSink error type
pull/24376/head
Dom 2022-11-21 15:05:03 +00:00 committed by GitHub
commit 586c3fe8ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 47 additions and 23 deletions

View File

@ -4,13 +4,13 @@ use async_trait::async_trait;
use dml::DmlOperation;
use parking_lot::Mutex;
use super::DmlSink;
use super::{DmlError, DmlSink};
use crate::data::DmlApplyAction;
#[derive(Debug, Default)]
struct MockDmlSinkState {
calls: Vec<DmlOperation>,
ret: VecDeque<Result<DmlApplyAction, crate::data::Error>>,
ret: VecDeque<Result<DmlApplyAction, DmlError>>,
}
#[derive(Debug, Default)]
@ -19,9 +19,9 @@ pub struct MockDmlSink {
}
impl MockDmlSink {
pub fn with_apply_return(
pub(crate) fn with_apply_return(
self,
ret: impl Into<VecDeque<Result<DmlApplyAction, crate::data::Error>>>,
ret: impl Into<VecDeque<Result<DmlApplyAction, DmlError>>>,
) -> Self {
self.state.lock().ret = ret.into();
self
@ -34,7 +34,8 @@ impl MockDmlSink {
#[async_trait]
impl DmlSink for MockDmlSink {
async fn apply(&self, op: DmlOperation) -> Result<DmlApplyAction, crate::data::Error> {
type Error = DmlError;
async fn apply(&self, op: DmlOperation) -> Result<DmlApplyAction, DmlError> {
let mut state = self.state.lock();
state.calls.push(op);
state.ret.pop_front().expect("no mock sink value to return")

View File

@ -1,17 +1,27 @@
use std::{fmt::Debug, ops::Deref, sync::Arc};
use std::{error::Error, fmt::Debug, ops::Deref, sync::Arc};
use async_trait::async_trait;
use dml::DmlOperation;
use thiserror::Error;
use crate::data::DmlApplyAction;
#[derive(Debug, Error)]
pub(crate) enum DmlError {
/// An error applying a [`DmlOperation`].
#[error(transparent)]
Data(#[from] crate::data::Error),
}
/// A [`DmlSink`] handles [`DmlOperation`] instances read from a shard.
#[async_trait]
pub(crate) trait DmlSink: Debug + Send + Sync {
type Error: Error + Into<DmlError> + Send;
/// Apply `op` read from a shard, returning `Ok(DmlApplyAction::Applied(bool))`, the bool indicating if the
/// ingest should be paused. Returns `Ok(DmlApplyAction::Skipped)` if the operation has been
/// applied previously and was skipped.
async fn apply(&self, op: DmlOperation) -> Result<DmlApplyAction, crate::data::Error>;
async fn apply(&self, op: DmlOperation) -> Result<DmlApplyAction, Self::Error>;
}
#[async_trait]
@ -19,7 +29,8 @@ impl<T> DmlSink for Arc<T>
where
T: DmlSink,
{
async fn apply(&self, op: DmlOperation) -> Result<DmlApplyAction, crate::data::Error> {
type Error = T::Error;
async fn apply(&self, op: DmlOperation) -> Result<DmlApplyAction, Self::Error> {
self.deref().apply(op).await
}
}

View File

@ -9,7 +9,10 @@ use observability_deps::tracing::*;
use thiserror::Error;
use tonic::{Request, Response};
use crate::{data::DmlApplyAction, dml_sink::DmlSink};
use crate::{
data::DmlApplyAction,
dml_sink::{DmlError, DmlSink},
};
// A list of error states when handling an RPC write request.
//
@ -29,7 +32,7 @@ enum RpcError {
Decode(mutable_batch_pb::decode::Error),
#[error(transparent)]
Apply(crate::data::Error),
Apply(DmlError),
}
impl From<RpcError> for tonic::Status {
@ -40,8 +43,10 @@ impl From<RpcError> for tonic::Status {
RpcError::Decode(_) | RpcError::NoPayload | RpcError::NoTables => {
Self::invalid_argument(e.to_string())
}
RpcError::Apply(Error::BufferWrite { source }) => map_write_error(source),
RpcError::Apply(Error::ShardNotFound { .. }) => {
RpcError::Apply(DmlError::Data(Error::BufferWrite { source })) => {
map_write_error(source)
}
RpcError::Apply(DmlError::Data(Error::ShardNotFound { .. })) => {
// This is not a reachable error state in the gRPC write model,
// and is enumerated here purely because of error conflation
// (one big error type instead of small, composable errors).
@ -159,7 +164,7 @@ where
}
Err(e) => {
error!(error=%e, "failed to apply DML op");
return Err(RpcError::Apply(e))?;
return Err(RpcError::Apply(e.into()))?;
}
}

View File

@ -510,7 +510,7 @@ fn metric_attrs(
mod tests {
use super::*;
use crate::{
dml_sink::mock_sink::MockDmlSink,
dml_sink::{mock_sink::MockDmlSink, DmlError},
lifecycle::{LifecycleConfig, LifecycleManager},
};
use assert_matches::assert_matches;
@ -996,7 +996,7 @@ mod tests {
Ok(DmlOperation::Write(make_write(2222, 2)))
]],
sink_rets = [
Err(crate::data::Error::ShardNotFound{shard_id: ShardId::new(42)}),
Err(DmlError::Data(crate::data::Error::ShardNotFound{shard_id: ShardId::new(42)})),
Ok(DmlApplyAction::Applied(true)),
],
want_ttbr = 2,

View File

@ -38,7 +38,9 @@ impl IngestSinkAdaptor {
#[async_trait]
impl DmlSink for IngestSinkAdaptor {
async fn apply(&self, op: DmlOperation) -> Result<DmlApplyAction, crate::data::Error> {
type Error = crate::data::Error;
async fn apply(&self, op: DmlOperation) -> Result<DmlApplyAction, Self::Error> {
self.ingest_data
.buffer_operation(self.shard_id, op, &self.lifecycle_handle)
.await

View File

@ -155,7 +155,9 @@ where
T: DmlSink,
P: TimeProvider,
{
async fn apply(&self, op: DmlOperation) -> Result<DmlApplyAction, crate::data::Error> {
type Error = T::Error;
async fn apply(&self, op: DmlOperation) -> Result<DmlApplyAction, Self::Error> {
let meta = op.meta();
// Immediately increment the "bytes read" metric as it records the
@ -236,7 +238,7 @@ where
mod tests {
use super::*;
use crate::{
dml_sink::mock_sink::MockDmlSink,
dml_sink::{mock_sink::MockDmlSink, DmlError},
stream_handler::mock_watermark_fetcher::MockWatermarkFetcher,
};
use assert_matches::assert_matches;
@ -302,9 +304,9 @@ mod tests {
async fn test(
op: impl Into<DmlOperation> + Send,
metrics: &metric::Registry,
with_sink_return: Result<DmlApplyAction, crate::data::Error>,
with_sink_return: Result<DmlApplyAction, DmlError>,
with_fetcher_return: Option<i64>,
) -> Result<DmlApplyAction, crate::data::Error> {
) -> Result<DmlApplyAction, DmlError> {
let op = op.into();
let inner = MockDmlSink::default().with_apply_return([with_sink_return]);
let instrumentation = SinkInstrumentation::new(
@ -420,13 +422,16 @@ mod tests {
let got = test(
op,
&metrics,
Err(crate::data::Error::ShardNotFound {
Err(DmlError::Data(crate::data::Error::ShardNotFound {
shard_id: ShardId::new(42),
}),
})),
Some(12345),
)
.await;
assert_matches!(got, Err(crate::data::Error::ShardNotFound { .. }));
assert_matches!(
got,
Err(DmlError::Data(crate::data::Error::ShardNotFound { .. }))
);
// Validate the various write buffer metrics
assert_matches!(