diff --git a/ingester/src/stream_handler/mock_sink.rs b/ingester/src/dml_sink/mock_sink.rs similarity index 100% rename from ingester/src/stream_handler/mock_sink.rs rename to ingester/src/dml_sink/mock_sink.rs diff --git a/ingester/src/dml_sink/mod.rs b/ingester/src/dml_sink/mod.rs new file mode 100644 index 0000000000..7aed0b7a52 --- /dev/null +++ b/ingester/src/dml_sink/mod.rs @@ -0,0 +1,5 @@ +mod r#trait; +pub(crate) use r#trait::*; + +#[cfg(test)] +pub(crate) mod mock_sink; diff --git a/ingester/src/stream_handler/sink.rs b/ingester/src/dml_sink/trait.rs similarity index 100% rename from ingester/src/stream_handler/sink.rs rename to ingester/src/dml_sink/trait.rs diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index 30e3fbc005..9dd8ae0111 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -20,6 +20,8 @@ mod arcmap; pub(crate) mod compact; pub mod data; +mod deferred_load; +mod dml_sink; pub mod handler; mod job; pub mod lifecycle; @@ -28,7 +30,5 @@ pub mod querier_handler; pub(crate) mod query_adaptor; pub mod server; pub(crate) mod stream_handler; - -mod deferred_load; #[cfg(test)] pub(crate) mod test_util; diff --git a/ingester/src/server/grpc/rpc_write.rs b/ingester/src/server/grpc/rpc_write.rs index 5d6f60b3b9..26956d6464 100644 --- a/ingester/src/server/grpc/rpc_write.rs +++ b/ingester/src/server/grpc/rpc_write.rs @@ -9,7 +9,7 @@ use observability_deps::tracing::*; use thiserror::Error; use tonic::{Request, Response}; -use crate::{data::DmlApplyAction, stream_handler::DmlSink}; +use crate::{data::DmlApplyAction, dml_sink::DmlSink}; // A list of error states when handling an RPC write request. // @@ -177,7 +177,7 @@ mod tests { Column, DatabaseBatch, TableBatch, }; - use crate::stream_handler::mock_sink::MockDmlSink; + use crate::dml_sink::mock_sink::MockDmlSink; use super::*; diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index 94489d34e4..ed46e42a78 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -1,8 +1,8 @@ //! A handler of streamed ops from a write buffer. -use super::DmlSink; use crate::{ data::DmlApplyAction, + dml_sink::DmlSink, lifecycle::{LifecycleHandle, LifecycleHandleImpl}, }; use data_types::{SequenceNumber, ShardId, ShardIndex}; @@ -510,8 +510,8 @@ fn metric_attrs( mod tests { use super::*; use crate::{ + dml_sink::mock_sink::MockDmlSink, lifecycle::{LifecycleConfig, LifecycleManager}, - stream_handler::mock_sink::MockDmlSink, }; use assert_matches::assert_matches; use async_trait::async_trait; diff --git a/ingester/src/stream_handler/mod.rs b/ingester/src/stream_handler/mod.rs index 5e9a351fe4..1a7f828c39 100644 --- a/ingester/src/stream_handler/mod.rs +++ b/ingester/src/stream_handler/mod.rs @@ -16,17 +16,13 @@ //! [`WriteBufferReading`]: write_buffer::core::WriteBufferReading //! [`LifecycleManager`]: crate::lifecycle::LifecycleManager //! [`LifecycleHandle::can_resume_ingest()`]: crate::lifecycle::LifecycleHandle::can_resume_ingest() +//! [`DmlSink`]: crate::dml_sink::DmlSink pub(crate) mod handler; mod periodic_watermark_fetcher; -mod sink; -#[cfg(test)] -pub mod mock_sink; #[cfg(test)] pub mod mock_watermark_fetcher; pub(crate) mod sink_adaptor; pub(crate) mod sink_instrumentation; - pub(crate) use periodic_watermark_fetcher::*; -pub(crate) use sink::*; diff --git a/ingester/src/stream_handler/sink_adaptor.rs b/ingester/src/stream_handler/sink_adaptor.rs index 3780f3da5b..30490ccf75 100644 --- a/ingester/src/stream_handler/sink_adaptor.rs +++ b/ingester/src/stream_handler/sink_adaptor.rs @@ -6,9 +6,9 @@ use async_trait::async_trait; use data_types::ShardId; use dml::DmlOperation; -use super::DmlSink; use crate::{ data::{DmlApplyAction, IngesterData}, + dml_sink::DmlSink, lifecycle::LifecycleHandleImpl, }; diff --git a/ingester/src/stream_handler/sink_instrumentation.rs b/ingester/src/stream_handler/sink_instrumentation.rs index ab74edd895..b7548bf583 100644 --- a/ingester/src/stream_handler/sink_instrumentation.rs +++ b/ingester/src/stream_handler/sink_instrumentation.rs @@ -9,9 +9,7 @@ use iox_time::{SystemProvider, TimeProvider}; use metric::{Attributes, DurationHistogram, U64Counter, U64Gauge}; use trace::span::{SpanExt, SpanRecorder}; -use crate::data::DmlApplyAction; - -use super::DmlSink; +use crate::{data::DmlApplyAction, dml_sink::DmlSink}; /// A [`WatermarkFetcher`] abstracts a source of the write buffer high watermark /// (max known offset). @@ -237,8 +235,9 @@ where #[cfg(test)] mod tests { use super::*; - use crate::stream_handler::{ - mock_sink::MockDmlSink, mock_watermark_fetcher::MockWatermarkFetcher, + use crate::{ + dml_sink::mock_sink::MockDmlSink, + stream_handler::mock_watermark_fetcher::MockWatermarkFetcher, }; use assert_matches::assert_matches; use data_types::{NamespaceId, Sequence, SequenceNumber, ShardId, TableId};