Merge pull request #6548 from influxdata/dom/reuse-partition-iter

refactor: reuse PartitionIter
pull/24376/head
Dom 2023-01-10 12:13:36 +00:00 committed by GitHub
commit f607c7f0bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 35 additions and 34 deletions

View File

@ -112,7 +112,7 @@ impl DmlSink for NopSink {
}
}
impl ingester2::wal::benches::PartitionIter for NopSink {
impl ingester2::partition_iter::PartitionIter for NopSink {
fn partition_iter(
&self,
) -> Box<dyn Iterator<Item = std::sync::Arc<parking_lot::Mutex<PartitionData>>> + Send> {

View File

@ -16,6 +16,7 @@ use super::{
use crate::{
arcmap::ArcMap,
dml_sink::DmlSink,
partition_iter::PartitionIter,
query::{response::QueryResponse, tracing::QueryExecTracing, QueryError, QueryExec},
};
@ -214,6 +215,15 @@ where
}
}
impl<O> PartitionIter for crate::buffer_tree::BufferTree<O>
where
O: Send + Sync + Debug + 'static,
{
fn partition_iter(&self) -> Box<dyn Iterator<Item = Arc<Mutex<PartitionData>>> + Send> {
Box::new(self.partitions())
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};

View File

@ -9,8 +9,8 @@ use wal::{SequencedWalOp, Wal};
use crate::{
dml_sink::{DmlError, DmlSink},
partition_iter::PartitionIter,
persist::{drain_buffer::persist_partitions, queue::PersistQueue},
wal::rotate_task::PartitionIter,
TRANSITION_SHARD_INDEX,
};

View File

@ -100,6 +100,7 @@ maybe_pub!(mod buffer_tree);
mod deferred_load;
maybe_pub!(mod dml_sink);
maybe_pub!(mod persist);
maybe_pub!(mod partition_iter);
mod query;
mod query_adaptor;
pub(crate) mod server;

View File

@ -0,0 +1,20 @@
use parking_lot::Mutex;
use std::{fmt::Debug, sync::Arc};
use crate::buffer_tree::partition::PartitionData;
/// An abstraction over any type that can yield an iterator of (potentially
/// empty) [`PartitionData`].
pub trait PartitionIter: Send + Debug {
/// Return the set of partitions in `self`.
fn partition_iter(&self) -> Box<dyn Iterator<Item = Arc<Mutex<PartitionData>>> + Send>;
}
impl<T> PartitionIter for Arc<T>
where
T: PartitionIter + Send + Sync,
{
fn partition_iter(&self) -> Box<dyn Iterator<Item = Arc<Mutex<PartitionData>>> + Send> {
(**self).partition_iter()
}
}

View File

@ -7,7 +7,3 @@
pub(crate) mod rotate_task;
mod traits;
pub(crate) mod wal_sink;
maybe_pub! {
pub use super::rotate_task::*;
}

View File

@ -1,37 +1,11 @@
use observability_deps::tracing::*;
use parking_lot::Mutex;
use std::{fmt::Debug, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};
use crate::{
buffer_tree::partition::PartitionData,
partition_iter::PartitionIter,
persist::{drain_buffer::persist_partitions, queue::PersistQueue},
};
/// An abstraction over any type that can yield an iterator of (potentially
/// empty) [`PartitionData`].
pub trait PartitionIter: Send + Debug {
/// Return the set of partitions in `self`.
fn partition_iter(&self) -> Box<dyn Iterator<Item = Arc<Mutex<PartitionData>>> + Send>;
}
impl<T> PartitionIter for Arc<T>
where
T: PartitionIter + Send + Sync,
{
fn partition_iter(&self) -> Box<dyn Iterator<Item = Arc<Mutex<PartitionData>>> + Send> {
(**self).partition_iter()
}
}
impl<O> PartitionIter for crate::buffer_tree::BufferTree<O>
where
O: Send + Sync + Debug + 'static,
{
fn partition_iter(&self) -> Box<dyn Iterator<Item = Arc<Mutex<PartitionData>>> + Send> {
Box::new(self.partitions())
}
}
/// Rotate the `wal` segment file every `period` duration of time.
pub(crate) async fn periodic_rotation<T, P>(
wal: Arc<wal::Wal>,