diff --git a/ingester2/benches/wal.rs b/ingester2/benches/wal.rs index b72ab786b3..5d197b78d9 100644 --- a/ingester2/benches/wal.rs +++ b/ingester2/benches/wal.rs @@ -112,7 +112,7 @@ impl DmlSink for NopSink { } } -impl ingester2::wal::benches::PartitionIter for NopSink { +impl ingester2::partition_iter::PartitionIter for NopSink { fn partition_iter( &self, ) -> Box>> + Send> { diff --git a/ingester2/src/buffer_tree/root.rs b/ingester2/src/buffer_tree/root.rs index b4f5177977..9373add813 100644 --- a/ingester2/src/buffer_tree/root.rs +++ b/ingester2/src/buffer_tree/root.rs @@ -16,6 +16,7 @@ use super::{ use crate::{ arcmap::ArcMap, dml_sink::DmlSink, + partition_iter::PartitionIter, query::{response::QueryResponse, tracing::QueryExecTracing, QueryError, QueryExec}, }; @@ -214,6 +215,15 @@ where } } +impl PartitionIter for crate::buffer_tree::BufferTree +where + O: Send + Sync + Debug + 'static, +{ + fn partition_iter(&self) -> Box>> + Send> { + Box::new(self.partitions()) + } +} + #[cfg(test)] mod tests { use std::{sync::Arc, time::Duration}; diff --git a/ingester2/src/init/wal_replay.rs b/ingester2/src/init/wal_replay.rs index 23076d783d..292e77171e 100644 --- a/ingester2/src/init/wal_replay.rs +++ b/ingester2/src/init/wal_replay.rs @@ -9,8 +9,8 @@ use wal::{SequencedWalOp, Wal}; use crate::{ dml_sink::{DmlError, DmlSink}, + partition_iter::PartitionIter, persist::{drain_buffer::persist_partitions, queue::PersistQueue}, - wal::rotate_task::PartitionIter, TRANSITION_SHARD_INDEX, }; diff --git a/ingester2/src/lib.rs b/ingester2/src/lib.rs index c349c0acfd..0fd242307a 100644 --- a/ingester2/src/lib.rs +++ b/ingester2/src/lib.rs @@ -100,6 +100,7 @@ maybe_pub!(mod buffer_tree); mod deferred_load; maybe_pub!(mod dml_sink); maybe_pub!(mod persist); +maybe_pub!(mod partition_iter); mod query; mod query_adaptor; pub(crate) mod server; diff --git a/ingester2/src/partition_iter.rs b/ingester2/src/partition_iter.rs new file mode 100644 index 0000000000..2944795770 --- /dev/null +++ b/ingester2/src/partition_iter.rs @@ -0,0 +1,20 @@ +use parking_lot::Mutex; +use std::{fmt::Debug, sync::Arc}; + +use crate::buffer_tree::partition::PartitionData; + +/// An abstraction over any type that can yield an iterator of (potentially +/// empty) [`PartitionData`]. +pub trait PartitionIter: Send + Debug { + /// Return the set of partitions in `self`. + fn partition_iter(&self) -> Box>> + Send>; +} + +impl PartitionIter for Arc +where + T: PartitionIter + Send + Sync, +{ + fn partition_iter(&self) -> Box>> + Send> { + (**self).partition_iter() + } +} diff --git a/ingester2/src/wal/mod.rs b/ingester2/src/wal/mod.rs index bc09dd28f8..343f6c0a76 100644 --- a/ingester2/src/wal/mod.rs +++ b/ingester2/src/wal/mod.rs @@ -7,7 +7,3 @@ pub(crate) mod rotate_task; mod traits; pub(crate) mod wal_sink; - -maybe_pub! { - pub use super::rotate_task::*; -} diff --git a/ingester2/src/wal/rotate_task.rs b/ingester2/src/wal/rotate_task.rs index 19f7d67489..bae8853f1c 100644 --- a/ingester2/src/wal/rotate_task.rs +++ b/ingester2/src/wal/rotate_task.rs @@ -1,37 +1,11 @@ use observability_deps::tracing::*; -use parking_lot::Mutex; -use std::{fmt::Debug, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use crate::{ - buffer_tree::partition::PartitionData, + partition_iter::PartitionIter, persist::{drain_buffer::persist_partitions, queue::PersistQueue}, }; -/// An abstraction over any type that can yield an iterator of (potentially -/// empty) [`PartitionData`]. -pub trait PartitionIter: Send + Debug { - /// Return the set of partitions in `self`. - fn partition_iter(&self) -> Box>> + Send>; -} - -impl PartitionIter for Arc -where - T: PartitionIter + Send + Sync, -{ - fn partition_iter(&self) -> Box>> + Send> { - (**self).partition_iter() - } -} - -impl PartitionIter for crate::buffer_tree::BufferTree -where - O: Send + Sync + Debug + 'static, -{ - fn partition_iter(&self) -> Box>> + Send> { - Box::new(self.partitions()) - } -} - /// Rotate the `wal` segment file every `period` duration of time. pub(crate) async fn periodic_rotation( wal: Arc,