refactor: ensure that we only have a partition ID ONCE within the pipeline (#6774)

* refactor: `MockPartitionDoneSink::{errors => results}`

* feat: ensure we don't concurrently process the same partition

Helps w/ #6750.

* test: extend `test_unique`

* docs: explain `unique_partitions`

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-02-01 16:42:40 +01:00 committed by GitHub
parent 0a20bd404e
commit d16eaac64b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 265 additions and 27 deletions

View File

@ -0,0 +1,3 @@
//! Combinations of multiple components that together can achieve one goal.
pub mod unique_partitions;

View File

@ -0,0 +1,230 @@
//! Ensure that partitions flowing through the pipeline are unique.
use std::{
collections::HashSet,
fmt::Display,
sync::{Arc, Mutex},
};
use async_trait::async_trait;
use data_types::PartitionId;
use crate::components::{
partition_done_sink::PartitionDoneSink, partitions_source::PartitionsSource,
};
/// Ensures that a unique set of partitions is flowing through the critical section of the compactor pipeline.
///
/// This should be used as a wrapper around the actual [`PartitionsSource`] and [`PartitionDoneSink`] and will setup of
/// the following stream layout:
///
/// ```text
/// (1)====>(2)====>[concurrent processing]---->(3)---->(4)---->(5)
/// ^ |
/// | |
/// | |
/// +-------------------------------------------+
/// ```
///
/// | Step | Name | Type | Description |
/// | ---- | --------------------- | ----------------------------------------------------------- | ----------- |
/// | 1 | **Actual source** | `inner_source`/`T1`/[`PartitionsSource`], wrapped | This is the actual source, e.g. a [catalog](crate::components::partitions_source::catalog::CatalogPartitionsSource) |
/// | 2 | **Unique IDs source** | [`UniquePartionsSourceWrapper`], wraps `inner_source`/`T1` | Outputs that [`PartitionId`]s from the `inner_source` but filters out partitions that have not yet reached the uniqueness sink (step 4) |
/// | 3 | **Critical section** | -- | Here it is always ensured that a single [`PartitionId`] does NOT occur more than once. |
/// | 4 | **Unique IDs sink** | [`UniquePartitionDoneSinkWrapper`], wraps `inner_sink`/`T2` | Observes incoming IDs and removes them from the filter applied in step 2. |
/// | 5 | **Actual sink** | `inner_sink`/`T2`/[`PartitionDoneSink`], wrapped | The actual sink. |
pub fn unique_partitions<T1, T2>(
inner_source: T1,
inner_sink: T2,
) -> (
UniquePartionsSourceWrapper<T1>,
UniquePartitionDoneSinkWrapper<T2>,
)
where
T1: PartitionsSource,
T2: PartitionDoneSink,
{
let in_flight = Arc::new(Mutex::new(HashSet::default()));
let source = UniquePartionsSourceWrapper {
inner: inner_source,
in_flight: Arc::clone(&in_flight),
};
let sink = UniquePartitionDoneSinkWrapper {
inner: inner_sink,
in_flight,
};
(source, sink)
}
type InFlight = Arc<Mutex<HashSet<PartitionId>>>;
#[derive(Debug)]
pub struct UniquePartionsSourceWrapper<T>
where
T: PartitionsSource,
{
inner: T,
in_flight: InFlight,
}
impl<T> Display for UniquePartionsSourceWrapper<T>
where
T: PartitionsSource,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "unique({})", self.inner)
}
}
#[async_trait]
impl<T> PartitionsSource for UniquePartionsSourceWrapper<T>
where
T: PartitionsSource,
{
async fn fetch(&self) -> Vec<PartitionId> {
let res = self.inner.fetch().await;
let mut guard = self.in_flight.lock().expect("not poisoned");
res.into_iter().filter(|id| guard.insert(*id)).collect()
}
}
#[derive(Debug)]
pub struct UniquePartitionDoneSinkWrapper<T>
where
T: PartitionDoneSink,
{
inner: T,
in_flight: InFlight,
}
impl<T> Display for UniquePartitionDoneSinkWrapper<T>
where
T: PartitionDoneSink,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "unique({})", self.inner)
}
}
#[async_trait]
impl<T> PartitionDoneSink for UniquePartitionDoneSinkWrapper<T>
where
T: PartitionDoneSink,
{
async fn record(
&self,
partition: PartitionId,
res: Result<(), Box<dyn std::error::Error + Send + Sync>>,
) {
let existing = {
let mut guard = self.in_flight.lock().expect("not poisoned");
guard.remove(&partition)
};
// perform check when NOT holding the mutex to not poison it
assert!(
existing,
"Unknown or already done partition in sink: {partition}"
);
// perform inner last, because the wrapping order is:
//
// - wrapped source
// - unique source
// - ...
// - unique sink
// - wrapped sink
self.inner.record(partition, res).await;
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use crate::components::{
partition_done_sink::mock::MockPartitionDoneSink,
partitions_source::mock::MockPartitionsSource,
};
use super::*;
#[test]
fn test_display() {
let (source, sink) = unique_partitions(
MockPartitionsSource::new(vec![]),
MockPartitionDoneSink::new(),
);
assert_eq!(source.to_string(), "unique(mock)");
assert_eq!(sink.to_string(), "unique(mock)");
}
#[tokio::test]
async fn test_unique() {
let inner_source = Arc::new(MockPartitionsSource::new(vec![
PartitionId::new(1),
PartitionId::new(1),
PartitionId::new(2),
PartitionId::new(3),
PartitionId::new(4),
]));
let inner_sink = Arc::new(MockPartitionDoneSink::new());
let (source, sink) = unique_partitions(Arc::clone(&inner_source), Arc::clone(&inner_sink));
assert_eq!(
source.fetch().await,
vec![
PartitionId::new(1),
PartitionId::new(2),
PartitionId::new(3),
PartitionId::new(4),
],
);
sink.record(PartitionId::new(1), Ok(())).await;
sink.record(PartitionId::new(2), Ok(())).await;
assert_eq!(
inner_sink.results(),
HashMap::from([(PartitionId::new(1), Ok(())), (PartitionId::new(2), Ok(()))]),
);
inner_source.set(vec![
PartitionId::new(1),
PartitionId::new(3),
PartitionId::new(5),
]);
assert_eq!(
source.fetch().await,
vec![PartitionId::new(1), PartitionId::new(5)],
);
sink.record(PartitionId::new(1), Err(String::from("foo").into()))
.await;
assert_eq!(source.fetch().await, vec![PartitionId::new(1)],);
assert_eq!(
inner_sink.results(),
HashMap::from([
(PartitionId::new(1), Err(String::from("foo"))),
(PartitionId::new(2), Ok(()))
]),
);
}
#[tokio::test]
#[should_panic(expected = "Unknown or already done partition in sink: 1")]
async fn test_panic_sink_unknown() {
let (source, sink) = unique_partitions(
MockPartitionsSource::new(vec![PartitionId::new(1)]),
MockPartitionDoneSink::new(),
);
let ids = source.fetch().await;
assert_eq!(ids.len(), 1);
let id = ids[0];
sink.record(id, Ok(())).await;
sink.record(id, Ok(())).await;
}
}

View File

@ -17,6 +17,7 @@ use crate::{
};
use super::{
combos::unique_partitions::unique_partitions,
commit::{
catalog::CatalogCommit, logging::LoggingCommitWrapper, metrics::MetricsCommitWrapper,
mock::MockCommit, Commit,
@ -116,6 +117,26 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
Arc::clone(&config.catalog),
))
};
let partition_done_sink =
LoggingPartitionDoneSinkWrapper::new(MetricsPartitionDoneSinkWrapper::new(
ErrorKindPartitionDoneSinkWrapper::new(
partition_done_sink,
ErrorKind::variants()
.iter()
.filter(|kind| {
// use explicit match statement so we never forget to add new variants
match kind {
ErrorKind::OutOfMemory | ErrorKind::Timeout | ErrorKind::Unknown => {
true
}
ErrorKind::ObjectStore => false,
}
})
.copied()
.collect(),
),
&config.metric_registry,
));
let commit: Arc<dyn Commit> = if config.shadow_mode {
Arc::new(MockCommit::new())
@ -132,6 +153,9 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
Arc::clone(config.parquet_store_real.object_store())
};
let (partitions_source, partition_done_sink) =
unique_partitions(partitions_source, partition_done_sink);
Arc::new(Components {
// Note: Place "not empty" wrapper at the very last so that the logging and metric wrapper work even when there
// is not data.
@ -169,27 +193,7 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
&config.metric_registry,
),
)),
partition_done_sink: Arc::new(LoggingPartitionDoneSinkWrapper::new(
MetricsPartitionDoneSinkWrapper::new(
ErrorKindPartitionDoneSinkWrapper::new(
partition_done_sink,
ErrorKind::variants()
.iter()
.filter(|kind| {
// use explicit match statement so we never forget to add new variants
match kind {
ErrorKind::OutOfMemory
| ErrorKind::Timeout
| ErrorKind::Unknown => true,
ErrorKind::ObjectStore => false,
}
})
.copied()
.collect(),
),
&config.metric_registry,
),
)),
partition_done_sink: Arc::new(partition_done_sink),
commit: Arc::new(LoggingCommitWrapper::new(MetricsCommitWrapper::new(
commit,
&config.metric_registry,

View File

@ -9,6 +9,7 @@ use self::{
round_split::RoundSplit, scratchpad::ScratchpadGen, tables_source::TablesSource,
};
pub mod combos;
pub mod commit;
pub mod df_plan_exec;
pub mod df_planner;

View File

@ -96,7 +96,7 @@ mod tests {
sink.record(PartitionId::new(4), Ok(())).await;
assert_eq!(
inner.errors(),
inner.results(),
HashMap::from([
(
PartitionId::new(1),

View File

@ -99,7 +99,7 @@ level = INFO; message = Finished partition; partition_id = 3; ",
);
assert_eq!(
inner.errors(),
inner.results(),
HashMap::from([
(
PartitionId::new(1),

View File

@ -119,7 +119,7 @@ mod tests {
assert_eq!(error_counter(&registry, "object_store"), 1);
assert_eq!(
inner.errors(),
inner.results(),
HashMap::from([
(
PartitionId::new(1),

View File

@ -18,7 +18,7 @@ impl MockPartitionDoneSink {
}
#[allow(dead_code)] // not used anywhere
pub fn errors(&self) -> HashMap<PartitionId, Result<(), String>> {
pub fn results(&self) -> HashMap<PartitionId, Result<(), String>> {
self.last.lock().expect("not poisoned").clone()
}
}
@ -52,7 +52,7 @@ mod tests {
async fn test_record() {
let sink = MockPartitionDoneSink::new();
assert_eq!(sink.errors(), HashMap::default(),);
assert_eq!(sink.results(), HashMap::default(),);
sink.record(PartitionId::new(1), Err("msg 1".into())).await;
sink.record(PartitionId::new(2), Err("msg 2".into())).await;
@ -60,7 +60,7 @@ mod tests {
sink.record(PartitionId::new(3), Ok(())).await;
assert_eq!(
sink.errors(),
sink.results(),
HashMap::from([
(PartitionId::new(1), Err(String::from("msg 3"))),
(PartitionId::new(2), Err(String::from("msg 2"))),