feat(idpe-17789): move Compactor abstractions PartitionsSource and PartitionStream to use CompactionJob

pull/24376/head
wiedld 2023-07-24 15:16:35 -07:00
parent 817bd595ca
commit bab6f239ea
12 changed files with 88 additions and 71 deletions

View File

@ -1,6 +1,6 @@
use std::{collections::VecDeque, fmt::Display, sync::Arc};
use data_types::PartitionId;
use compactor_scheduler::CompactionJob;
use futures::{stream::BoxStream, StreamExt};
use super::super::{
@ -42,7 +42,7 @@ impl<T> PartitionStream for EndlessPartititionStream<T>
where
T: PartitionsSource,
{
fn stream(&self) -> BoxStream<'_, PartitionId> {
fn stream(&self) -> BoxStream<'_, CompactionJob> {
let source = Arc::clone(&self.source);
// Note: we use a VecDeque as a buffer so we can preserve the order and cheaply remove the first element without
@ -80,6 +80,8 @@ where
#[cfg(test)]
mod tests {
use data_types::PartitionId;
use super::{super::super::partitions_source::mock::MockPartitionsSource, *};
#[test]
@ -91,9 +93,9 @@ mod tests {
#[tokio::test]
async fn test_stream() {
let ids = vec![
PartitionId::new(1),
PartitionId::new(3),
PartitionId::new(2),
CompactionJob::new(PartitionId::new(1)),
CompactionJob::new(PartitionId::new(3)),
CompactionJob::new(PartitionId::new(2)),
];
let stream = EndlessPartititionStream::new(MockPartitionsSource::new(ids.clone()));
@ -102,13 +104,7 @@ mod tests {
// we need to limit the stream at one point because it is endless
assert_eq!(
stream.stream().take(5).collect::<Vec<_>>().await,
vec![
PartitionId::new(1),
PartitionId::new(3),
PartitionId::new(2),
PartitionId::new(1),
PartitionId::new(3)
],
[&ids[..], &ids[..2]].concat(),
);
}
}

View File

@ -1,6 +1,6 @@
use std::fmt::{Debug, Display};
use data_types::PartitionId;
use compactor_scheduler::CompactionJob;
use futures::stream::BoxStream;
pub mod endless;
@ -8,8 +8,8 @@ pub mod once;
/// Source for partitions.
pub trait PartitionStream: Debug + Display + Send + Sync {
/// Create new source stream of partitions.
/// Create new source stream of compaction job.
///
/// This stream may be endless.
fn stream(&self) -> BoxStream<'_, PartitionId>;
fn stream(&self) -> BoxStream<'_, CompactionJob>;
}

View File

@ -1,6 +1,6 @@
use std::{fmt::Display, sync::Arc};
use data_types::PartitionId;
use compactor_scheduler::CompactionJob;
use futures::{stream::BoxStream, StreamExt};
use super::{super::partitions_source::PartitionsSource, PartitionStream};
@ -37,7 +37,7 @@ impl<T> PartitionStream for OncePartititionStream<T>
where
T: PartitionsSource,
{
fn stream(&self) -> BoxStream<'_, PartitionId> {
fn stream(&self) -> BoxStream<'_, CompactionJob> {
let source = Arc::clone(&self.source);
futures::stream::once(async move { futures::stream::iter(source.fetch().await) })
.flatten()
@ -47,6 +47,8 @@ where
#[cfg(test)]
mod tests {
use data_types::PartitionId;
use super::{super::super::partitions_source::mock::MockPartitionsSource, *};
#[test]
@ -58,9 +60,9 @@ mod tests {
#[tokio::test]
async fn test_stream() {
let ids = vec![
PartitionId::new(1),
PartitionId::new(3),
PartitionId::new(2),
CompactionJob::new(PartitionId::new(1)),
CompactionJob::new(PartitionId::new(3)),
CompactionJob::new(PartitionId::new(2)),
];
let stream = OncePartititionStream::new(MockPartitionsSource::new(ids.clone()));

View File

@ -1,7 +1,7 @@
use std::fmt::Display;
use async_trait::async_trait;
use data_types::PartitionId;
use compactor_scheduler::CompactionJob;
use observability_deps::tracing::{info, warn};
use super::PartitionsSource;
@ -37,7 +37,7 @@ impl<T> PartitionsSource for LoggingPartitionsSourceWrapper<T>
where
T: PartitionsSource,
{
async fn fetch(&self) -> Vec<PartitionId> {
async fn fetch(&self) -> Vec<CompactionJob> {
let partitions = self.inner.fetch().await;
info!(n_partitions = partitions.len(), "Fetch partitions",);
if partitions.is_empty() {
@ -49,6 +49,7 @@ where
#[cfg(test)]
mod tests {
use data_types::PartitionId;
use test_helpers::tracing::TracingCapture;
use super::{super::mock::MockPartitionsSource, *};
@ -74,9 +75,9 @@ mod tests {
#[tokio::test]
async fn test_fetch_some() {
let p_1 = PartitionId::new(5);
let p_2 = PartitionId::new(1);
let p_3 = PartitionId::new(12);
let p_1 = CompactionJob::new(PartitionId::new(5));
let p_2 = CompactionJob::new(PartitionId::new(1));
let p_3 = CompactionJob::new(PartitionId::new(12));
let partitions = vec![p_1, p_2, p_3];
let source =

View File

@ -1,7 +1,7 @@
use std::fmt::Display;
use async_trait::async_trait;
use data_types::PartitionId;
use compactor_scheduler::CompactionJob;
use metric::{Registry, U64Counter};
use super::PartitionsSource;
@ -59,7 +59,7 @@ impl<T> PartitionsSource for MetricsPartitionsSourceWrapper<T>
where
T: PartitionsSource,
{
async fn fetch(&self) -> Vec<PartitionId> {
async fn fetch(&self) -> Vec<CompactionJob> {
let partitions = self.inner.fetch().await;
self.partitions_fetch_counter.inc(1);
self.partitions_counter.inc(partitions.len() as u64);
@ -69,6 +69,7 @@ where
#[cfg(test)]
mod tests {
use data_types::PartitionId;
use metric::assert_counter;
use super::{super::mock::MockPartitionsSource, *};
@ -85,9 +86,9 @@ mod tests {
async fn test_fetch() {
let registry = Registry::new();
let partitions = vec![
PartitionId::new(5),
PartitionId::new(1),
PartitionId::new(12),
CompactionJob::new(PartitionId::new(5)),
CompactionJob::new(PartitionId::new(1)),
CompactionJob::new(PartitionId::new(12)),
];
let source = MetricsPartitionsSourceWrapper::new(
MockPartitionsSource::new(partitions.clone()),

View File

@ -1,27 +1,27 @@
use async_trait::async_trait;
use data_types::PartitionId;
use compactor_scheduler::CompactionJob;
use parking_lot::Mutex;
use super::PartitionsSource;
/// A mock structure for providing [partitions](PartitionId).
/// A mock structure for providing [partitions](CompactionJob).
#[derive(Debug)]
pub struct MockPartitionsSource {
partitions: Mutex<Vec<PartitionId>>,
partitions: Mutex<Vec<CompactionJob>>,
}
impl MockPartitionsSource {
#[allow(dead_code)]
/// Create a new MockPartitionsSource.
pub fn new(partitions: Vec<PartitionId>) -> Self {
pub fn new(partitions: Vec<CompactionJob>) -> Self {
Self {
partitions: Mutex::new(partitions),
}
}
/// Set PartitionIds for MockPartitionsSource.
/// Set CompactionJobs for MockPartitionsSource.
#[allow(dead_code)] // not used anywhere
pub fn set(&self, partitions: Vec<PartitionId>) {
pub fn set(&self, partitions: Vec<CompactionJob>) {
*self.partitions.lock() = partitions;
}
}
@ -34,13 +34,15 @@ impl std::fmt::Display for MockPartitionsSource {
#[async_trait]
impl PartitionsSource for MockPartitionsSource {
async fn fetch(&self) -> Vec<PartitionId> {
async fn fetch(&self) -> Vec<CompactionJob> {
self.partitions.lock().clone()
}
}
#[cfg(test)]
mod tests {
use data_types::PartitionId;
use super::*;
#[test]
@ -53,9 +55,9 @@ mod tests {
let source = MockPartitionsSource::new(vec![]);
assert_eq!(source.fetch().await, vec![],);
let p_1 = PartitionId::new(5);
let p_2 = PartitionId::new(1);
let p_3 = PartitionId::new(12);
let p_1 = CompactionJob::new(PartitionId::new(5));
let p_2 = CompactionJob::new(PartitionId::new(1));
let p_3 = CompactionJob::new(PartitionId::new(12));
let parts = vec![p_1, p_2, p_3];
source.set(parts.clone());
assert_eq!(source.fetch().await, parts,);

View File

@ -1,6 +1,6 @@
//! Abstractions that provide functionality over a [`PartitionsSource`] of PartitionIds.
//!
//! These abstractions are for actions taken in a compactor using the PartitionIds received from a compactor_scheduler.
//! These abstractions are for actions taken in a compactor using the CompactionJobs received from a compactor_scheduler.
pub mod logging;
pub mod metrics;
pub mod mock;
@ -14,9 +14,9 @@ use std::{
};
use async_trait::async_trait;
use data_types::PartitionId;
use compactor_scheduler::CompactionJob;
/// A source of partitions, noted by [`PartitionId`](data_types::PartitionId), that may potentially need compacting.
/// A source of partitions, noted by [`CompactionJob`](compactor_scheduler::CompactionJob), that may potentially need compacting.
#[async_trait]
pub trait PartitionsSource: Debug + Display + Send + Sync {
/// Get partition IDs.
@ -24,7 +24,7 @@ pub trait PartitionsSource: Debug + Display + Send + Sync {
/// This method performs retries.
///
/// This should only perform basic, efficient filtering. It MUST NOT inspect individual parquet files.
async fn fetch(&self) -> Vec<PartitionId>;
async fn fetch(&self) -> Vec<CompactionJob>;
}
#[async_trait]
@ -32,7 +32,7 @@ impl<T> PartitionsSource for Arc<T>
where
T: PartitionsSource + ?Sized,
{
async fn fetch(&self) -> Vec<PartitionId> {
async fn fetch(&self) -> Vec<CompactionJob> {
self.as_ref().fetch().await
}
}

View File

@ -1,7 +1,7 @@
use std::{fmt::Display, sync::Arc, time::Duration};
use async_trait::async_trait;
use data_types::PartitionId;
use compactor_scheduler::CompactionJob;
use iox_time::TimeProvider;
use super::PartitionsSource;
@ -43,7 +43,7 @@ impl<T> PartitionsSource for NotEmptyPartitionsSourceWrapper<T>
where
T: PartitionsSource,
{
async fn fetch(&self) -> Vec<PartitionId> {
async fn fetch(&self) -> Vec<CompactionJob> {
loop {
let res = self.inner.fetch().await;
if !res.is_empty() {
@ -57,6 +57,7 @@ where
#[cfg(test)]
mod tests {
use compactor_test_utils::AssertFutureExt;
use data_types::PartitionId;
use iox_time::{MockProvider, Time};
use super::{super::mock::MockPartitionsSource, *};
@ -90,7 +91,7 @@ mod tests {
fut.assert_pending().await;
// insert data but system is still throttled
let p = PartitionId::new(5);
let p = CompactionJob::new(PartitionId::new(5));
let parts = vec![p];
inner.set(parts.clone());
fut.assert_pending().await;

View File

@ -1,7 +1,7 @@
use std::fmt::Display;
use async_trait::async_trait;
use data_types::PartitionId;
use compactor_scheduler::CompactionJob;
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
use super::PartitionsSource;
@ -38,7 +38,7 @@ impl<T> PartitionsSource for RandomizeOrderPartitionsSourcesWrapper<T>
where
T: PartitionsSource,
{
async fn fetch(&self) -> Vec<PartitionId> {
async fn fetch(&self) -> Vec<CompactionJob> {
let mut partitions = self.inner.fetch().await;
let mut rng = StdRng::seed_from_u64(self.seed);
partitions.shuffle(&mut rng);
@ -48,6 +48,8 @@ where
#[cfg(test)]
mod tests {
use data_types::PartitionId;
use super::{super::mock::MockPartitionsSource, *};
#[test]
@ -66,21 +68,27 @@ mod tests {
#[tokio::test]
async fn test_fetch_some() {
let p_1 = PartitionId::new(5);
let p_2 = PartitionId::new(1);
let p_3 = PartitionId::new(12);
let partitions = vec![p_1, p_2, p_3];
let p_1 = CompactionJob::new(PartitionId::new(5));
let p_2 = CompactionJob::new(PartitionId::new(1));
let p_3 = CompactionJob::new(PartitionId::new(12));
let partitions = vec![p_1.clone(), p_2.clone(), p_3.clone()];
// shuffles
let source = RandomizeOrderPartitionsSourcesWrapper::new(
MockPartitionsSource::new(partitions.clone()),
123,
);
assert_eq!(source.fetch().await, vec![p_3, p_2, p_1,],);
assert_eq!(
source.fetch().await,
vec![p_3.clone(), p_2.clone(), p_1.clone(),],
);
// is deterministic in same source
for _ in 0..100 {
assert_eq!(source.fetch().await, vec![p_3, p_2, p_1,],);
assert_eq!(
source.fetch().await,
vec![p_3.clone(), p_2.clone(), p_1.clone(),],
);
}
// is deterministic with new source
@ -89,7 +97,10 @@ mod tests {
MockPartitionsSource::new(partitions.clone()),
123,
);
assert_eq!(source.fetch().await, vec![p_3, p_2, p_1,],);
assert_eq!(
source.fetch().await,
vec![p_3.clone(), p_2.clone(), p_1.clone(),],
);
}
// different seed => different output

View File

@ -2,7 +2,6 @@ use std::sync::Arc;
use async_trait::async_trait;
use compactor_scheduler::{CompactionJob, Scheduler};
use data_types::PartitionId;
use super::PartitionsSource;
@ -19,9 +18,8 @@ impl ScheduledPartitionsSource {
#[async_trait]
impl PartitionsSource for ScheduledPartitionsSource {
async fn fetch(&self) -> Vec<PartitionId> {
let job: Vec<CompactionJob> = self.scheduler.get_jobs().await;
job.into_iter().map(|job| job.partition_id).collect()
async fn fetch(&self) -> Vec<CompactionJob> {
self.scheduler.get_jobs().await
}
}

View File

@ -1,6 +1,7 @@
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use chrono::Utc;
use compactor_scheduler::CompactionJob;
use data_types::{CompactionLevel, ParquetFile, ParquetFileParams, PartitionId};
use futures::{stream, StreamExt, TryStreamExt};
use iox_query::exec::query_tracing::send_metrics_to_tracing;
@ -36,7 +37,7 @@ pub async fn compact(
components
.partition_stream
.stream()
.map(|partition_id| {
.map(|job| {
let components = Arc::clone(components);
// A root span is created for each partition. Later this can be linked to the
@ -48,7 +49,7 @@ pub async fn compact(
compact_partition(
span,
partition_id,
job,
partition_timeout,
Arc::clone(&df_semaphore),
components,
@ -61,11 +62,12 @@ pub async fn compact(
async fn compact_partition(
mut span: SpanRecorder,
partition_id: PartitionId,
job: CompactionJob,
partition_timeout: Duration,
df_semaphore: Arc<InstrumentedAsyncSemaphore>,
components: Arc<Components>,
) {
let partition_id = job.partition_id;
info!(partition_id = partition_id.get(), timeout = ?partition_timeout, "compact partition",);
span.set_metadata("partition_id", partition_id.get().to_string());
let scratchpad = components.scratchpad_gen.pad();
@ -76,7 +78,7 @@ async fn compact_partition(
async {
try_compact_partition(
span,
partition_id,
job.clone(),
df_semaphore,
components,
scratchpad,
@ -203,12 +205,13 @@ async fn compact_partition(
/// . Round 2 happens or not depends on the stop condition
async fn try_compact_partition(
span: SpanRecorder,
partition_id: PartitionId,
job: CompactionJob,
df_semaphore: Arc<InstrumentedAsyncSemaphore>,
components: Arc<Components>,
scratchpad_ctx: Arc<dyn Scratchpad>,
transmit_progress_signal: Sender<bool>,
) -> Result<(), DynError> {
let partition_id = job.partition_id;
let mut files = components.partition_files_source.fetch(partition_id).await;
let partition_info = components.partition_info_source.fetch(partition_id).await?;
let transmit_progress_signal = Arc::new(transmit_progress_signal);
@ -269,12 +272,13 @@ async fn try_compact_partition(
let df_semaphore = Arc::clone(&df_semaphore);
let transmit_progress_signal = Arc::clone(&transmit_progress_signal);
let scratchpad = Arc::clone(&scratchpad_ctx);
let job = job.clone();
let branch_span = round_span.child("branch");
async move {
execute_branch(
branch_span,
partition_id,
job,
branch,
df_semaphore,
components,
@ -298,7 +302,7 @@ async fn try_compact_partition(
#[allow(clippy::too_many_arguments)]
async fn execute_branch(
span: SpanRecorder,
partition_id: PartitionId,
job: CompactionJob,
branch: Vec<ParquetFile>,
df_semaphore: Arc<InstrumentedAsyncSemaphore>,
components: Arc<Components>,
@ -418,7 +422,7 @@ async fn execute_branch(
// files and update the upgraded files
let (created_files, upgraded_files) = update_catalog(
Arc::clone(&components),
partition_id,
job.clone(),
&saved_parquet_file_state,
files_to_delete,
upgrade,
@ -627,13 +631,14 @@ async fn fetch_and_save_parquet_file_state(
/// Return created and upgraded files
async fn update_catalog(
components: Arc<Components>,
partition_id: PartitionId,
job: CompactionJob,
saved_parquet_file_state: &SavedParquetFileState,
files_to_delete: Vec<ParquetFile>,
files_to_upgrade: Vec<ParquetFile>,
file_params_to_create: Vec<ParquetFileParams>,
target_level: CompactionLevel,
) -> Result<(Vec<ParquetFile>, Vec<ParquetFile>), DynError> {
let partition_id = job.partition_id;
let current_parquet_file_state =
fetch_and_save_parquet_file_state(&components, partition_id).await;

View File

@ -60,7 +60,7 @@ impl std::fmt::Display for SchedulerConfig {
}
/// Job assignment for a given partition.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct CompactionJob {
#[allow(dead_code)]
/// Unique identifier for this job.