diff --git a/compactor2/src/components/combos/mod.rs b/compactor2/src/components/combos/mod.rs new file mode 100644 index 0000000000..944bec0265 --- /dev/null +++ b/compactor2/src/components/combos/mod.rs @@ -0,0 +1,3 @@ +//! Combinations of multiple components that together can achieve one goal. + +pub mod unique_partitions; diff --git a/compactor2/src/components/combos/unique_partitions.rs b/compactor2/src/components/combos/unique_partitions.rs new file mode 100644 index 0000000000..5547ade370 --- /dev/null +++ b/compactor2/src/components/combos/unique_partitions.rs @@ -0,0 +1,230 @@ +//! Ensure that partitions flowing through the pipeline are unique. + +use std::{ + collections::HashSet, + fmt::Display, + sync::{Arc, Mutex}, +}; + +use async_trait::async_trait; +use data_types::PartitionId; + +use crate::components::{ + partition_done_sink::PartitionDoneSink, partitions_source::PartitionsSource, +}; + +/// Ensures that a unique set of partitions is flowing through the critical section of the compactor pipeline. +/// +/// This should be used as a wrapper around the actual [`PartitionsSource`] and [`PartitionDoneSink`] and will setup of +/// the following stream layout: +/// +/// ```text +/// (1)====>(2)====>[concurrent processing]---->(3)---->(4)---->(5) +/// ^ | +/// | | +/// | | +/// +-------------------------------------------+ +/// ``` +/// +/// | Step | Name | Type | Description | +/// | ---- | --------------------- | ----------------------------------------------------------- | ----------- | +/// | 1 | **Actual source** | `inner_source`/`T1`/[`PartitionsSource`], wrapped | This is the actual source, e.g. a [catalog](crate::components::partitions_source::catalog::CatalogPartitionsSource) | +/// | 2 | **Unique IDs source** | [`UniquePartionsSourceWrapper`], wraps `inner_source`/`T1` | Outputs that [`PartitionId`]s from the `inner_source` but filters out partitions that have not yet reached the uniqueness sink (step 4) | +/// | 3 | **Critical section** | -- | Here it is always ensured that a single [`PartitionId`] does NOT occur more than once. | +/// | 4 | **Unique IDs sink** | [`UniquePartitionDoneSinkWrapper`], wraps `inner_sink`/`T2` | Observes incoming IDs and removes them from the filter applied in step 2. | +/// | 5 | **Actual sink** | `inner_sink`/`T2`/[`PartitionDoneSink`], wrapped | The actual sink. | +pub fn unique_partitions( + inner_source: T1, + inner_sink: T2, +) -> ( + UniquePartionsSourceWrapper, + UniquePartitionDoneSinkWrapper, +) +where + T1: PartitionsSource, + T2: PartitionDoneSink, +{ + let in_flight = Arc::new(Mutex::new(HashSet::default())); + let source = UniquePartionsSourceWrapper { + inner: inner_source, + in_flight: Arc::clone(&in_flight), + }; + let sink = UniquePartitionDoneSinkWrapper { + inner: inner_sink, + in_flight, + }; + (source, sink) +} + +type InFlight = Arc>>; + +#[derive(Debug)] +pub struct UniquePartionsSourceWrapper +where + T: PartitionsSource, +{ + inner: T, + in_flight: InFlight, +} + +impl Display for UniquePartionsSourceWrapper +where + T: PartitionsSource, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "unique({})", self.inner) + } +} + +#[async_trait] +impl PartitionsSource for UniquePartionsSourceWrapper +where + T: PartitionsSource, +{ + async fn fetch(&self) -> Vec { + let res = self.inner.fetch().await; + + let mut guard = self.in_flight.lock().expect("not poisoned"); + res.into_iter().filter(|id| guard.insert(*id)).collect() + } +} + +#[derive(Debug)] +pub struct UniquePartitionDoneSinkWrapper +where + T: PartitionDoneSink, +{ + inner: T, + in_flight: InFlight, +} + +impl Display for UniquePartitionDoneSinkWrapper +where + T: PartitionDoneSink, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "unique({})", self.inner) + } +} + +#[async_trait] +impl PartitionDoneSink for UniquePartitionDoneSinkWrapper +where + T: PartitionDoneSink, +{ + async fn record( + &self, + partition: PartitionId, + res: Result<(), Box>, + ) { + let existing = { + let mut guard = self.in_flight.lock().expect("not poisoned"); + guard.remove(&partition) + }; + // perform check when NOT holding the mutex to not poison it + assert!( + existing, + "Unknown or already done partition in sink: {partition}" + ); + + // perform inner last, because the wrapping order is: + // + // - wrapped source + // - unique source + // - ... + // - unique sink + // - wrapped sink + self.inner.record(partition, res).await; + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use crate::components::{ + partition_done_sink::mock::MockPartitionDoneSink, + partitions_source::mock::MockPartitionsSource, + }; + + use super::*; + + #[test] + fn test_display() { + let (source, sink) = unique_partitions( + MockPartitionsSource::new(vec![]), + MockPartitionDoneSink::new(), + ); + assert_eq!(source.to_string(), "unique(mock)"); + assert_eq!(sink.to_string(), "unique(mock)"); + } + + #[tokio::test] + async fn test_unique() { + let inner_source = Arc::new(MockPartitionsSource::new(vec![ + PartitionId::new(1), + PartitionId::new(1), + PartitionId::new(2), + PartitionId::new(3), + PartitionId::new(4), + ])); + let inner_sink = Arc::new(MockPartitionDoneSink::new()); + let (source, sink) = unique_partitions(Arc::clone(&inner_source), Arc::clone(&inner_sink)); + + assert_eq!( + source.fetch().await, + vec![ + PartitionId::new(1), + PartitionId::new(2), + PartitionId::new(3), + PartitionId::new(4), + ], + ); + + sink.record(PartitionId::new(1), Ok(())).await; + sink.record(PartitionId::new(2), Ok(())).await; + + assert_eq!( + inner_sink.results(), + HashMap::from([(PartitionId::new(1), Ok(())), (PartitionId::new(2), Ok(()))]), + ); + + inner_source.set(vec![ + PartitionId::new(1), + PartitionId::new(3), + PartitionId::new(5), + ]); + + assert_eq!( + source.fetch().await, + vec![PartitionId::new(1), PartitionId::new(5)], + ); + + sink.record(PartitionId::new(1), Err(String::from("foo").into())) + .await; + + assert_eq!(source.fetch().await, vec![PartitionId::new(1)],); + + assert_eq!( + inner_sink.results(), + HashMap::from([ + (PartitionId::new(1), Err(String::from("foo"))), + (PartitionId::new(2), Ok(())) + ]), + ); + } + + #[tokio::test] + #[should_panic(expected = "Unknown or already done partition in sink: 1")] + async fn test_panic_sink_unknown() { + let (source, sink) = unique_partitions( + MockPartitionsSource::new(vec![PartitionId::new(1)]), + MockPartitionDoneSink::new(), + ); + let ids = source.fetch().await; + assert_eq!(ids.len(), 1); + let id = ids[0]; + sink.record(id, Ok(())).await; + sink.record(id, Ok(())).await; + } +} diff --git a/compactor2/src/components/hardcoded.rs b/compactor2/src/components/hardcoded.rs index 9f5409f74d..a10a8dbfa9 100644 --- a/compactor2/src/components/hardcoded.rs +++ b/compactor2/src/components/hardcoded.rs @@ -17,6 +17,7 @@ use crate::{ }; use super::{ + combos::unique_partitions::unique_partitions, commit::{ catalog::CatalogCommit, logging::LoggingCommitWrapper, metrics::MetricsCommitWrapper, mock::MockCommit, Commit, @@ -116,6 +117,26 @@ pub fn hardcoded_components(config: &Config) -> Arc { Arc::clone(&config.catalog), )) }; + let partition_done_sink = + LoggingPartitionDoneSinkWrapper::new(MetricsPartitionDoneSinkWrapper::new( + ErrorKindPartitionDoneSinkWrapper::new( + partition_done_sink, + ErrorKind::variants() + .iter() + .filter(|kind| { + // use explicit match statement so we never forget to add new variants + match kind { + ErrorKind::OutOfMemory | ErrorKind::Timeout | ErrorKind::Unknown => { + true + } + ErrorKind::ObjectStore => false, + } + }) + .copied() + .collect(), + ), + &config.metric_registry, + )); let commit: Arc = if config.shadow_mode { Arc::new(MockCommit::new()) @@ -132,6 +153,9 @@ pub fn hardcoded_components(config: &Config) -> Arc { Arc::clone(config.parquet_store_real.object_store()) }; + let (partitions_source, partition_done_sink) = + unique_partitions(partitions_source, partition_done_sink); + Arc::new(Components { // Note: Place "not empty" wrapper at the very last so that the logging and metric wrapper work even when there // is not data. @@ -169,27 +193,7 @@ pub fn hardcoded_components(config: &Config) -> Arc { &config.metric_registry, ), )), - partition_done_sink: Arc::new(LoggingPartitionDoneSinkWrapper::new( - MetricsPartitionDoneSinkWrapper::new( - ErrorKindPartitionDoneSinkWrapper::new( - partition_done_sink, - ErrorKind::variants() - .iter() - .filter(|kind| { - // use explicit match statement so we never forget to add new variants - match kind { - ErrorKind::OutOfMemory - | ErrorKind::Timeout - | ErrorKind::Unknown => true, - ErrorKind::ObjectStore => false, - } - }) - .copied() - .collect(), - ), - &config.metric_registry, - ), - )), + partition_done_sink: Arc::new(partition_done_sink), commit: Arc::new(LoggingCommitWrapper::new(MetricsCommitWrapper::new( commit, &config.metric_registry, diff --git a/compactor2/src/components/mod.rs b/compactor2/src/components/mod.rs index ab789354e9..34af944c21 100644 --- a/compactor2/src/components/mod.rs +++ b/compactor2/src/components/mod.rs @@ -9,6 +9,7 @@ use self::{ round_split::RoundSplit, scratchpad::ScratchpadGen, tables_source::TablesSource, }; +pub mod combos; pub mod commit; pub mod df_plan_exec; pub mod df_planner; diff --git a/compactor2/src/components/partition_done_sink/error_kind.rs b/compactor2/src/components/partition_done_sink/error_kind.rs index d8b4a9cb3c..0848906215 100644 --- a/compactor2/src/components/partition_done_sink/error_kind.rs +++ b/compactor2/src/components/partition_done_sink/error_kind.rs @@ -96,7 +96,7 @@ mod tests { sink.record(PartitionId::new(4), Ok(())).await; assert_eq!( - inner.errors(), + inner.results(), HashMap::from([ ( PartitionId::new(1), diff --git a/compactor2/src/components/partition_done_sink/logging.rs b/compactor2/src/components/partition_done_sink/logging.rs index 21e7c366e9..d4649567dc 100644 --- a/compactor2/src/components/partition_done_sink/logging.rs +++ b/compactor2/src/components/partition_done_sink/logging.rs @@ -99,7 +99,7 @@ level = INFO; message = Finished partition; partition_id = 3; ", ); assert_eq!( - inner.errors(), + inner.results(), HashMap::from([ ( PartitionId::new(1), diff --git a/compactor2/src/components/partition_done_sink/metrics.rs b/compactor2/src/components/partition_done_sink/metrics.rs index 4bebe01cf9..66c48be711 100644 --- a/compactor2/src/components/partition_done_sink/metrics.rs +++ b/compactor2/src/components/partition_done_sink/metrics.rs @@ -119,7 +119,7 @@ mod tests { assert_eq!(error_counter(®istry, "object_store"), 1); assert_eq!( - inner.errors(), + inner.results(), HashMap::from([ ( PartitionId::new(1), diff --git a/compactor2/src/components/partition_done_sink/mock.rs b/compactor2/src/components/partition_done_sink/mock.rs index 4cd3aceb7d..b906a8b520 100644 --- a/compactor2/src/components/partition_done_sink/mock.rs +++ b/compactor2/src/components/partition_done_sink/mock.rs @@ -18,7 +18,7 @@ impl MockPartitionDoneSink { } #[allow(dead_code)] // not used anywhere - pub fn errors(&self) -> HashMap> { + pub fn results(&self) -> HashMap> { self.last.lock().expect("not poisoned").clone() } } @@ -52,7 +52,7 @@ mod tests { async fn test_record() { let sink = MockPartitionDoneSink::new(); - assert_eq!(sink.errors(), HashMap::default(),); + assert_eq!(sink.results(), HashMap::default(),); sink.record(PartitionId::new(1), Err("msg 1".into())).await; sink.record(PartitionId::new(2), Err("msg 2".into())).await; @@ -60,7 +60,7 @@ mod tests { sink.record(PartitionId::new(3), Ok(())).await; assert_eq!( - sink.errors(), + sink.results(), HashMap::from([ (PartitionId::new(1), Err(String::from("msg 3"))), (PartitionId::new(2), Err(String::from("msg 2"))),