Revert "feat(idpe-17789): scheduler job_status() (#8202)" (#8213)

This reverts commit 3dabccd84b.
pull/24376/head
wiedld 2023-07-11 10:33:56 -07:00 committed by GitHub
parent 3dabccd84b
commit d43300635e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 206 additions and 906 deletions

4
Cargo.lock generated
View File

@ -1001,15 +1001,11 @@ dependencies = [
"async-trait",
"backoff",
"data_types",
"futures",
"iox_catalog",
"iox_tests",
"iox_time",
"itertools 0.11.0",
"metric",
"observability_deps",
"sharder",
"test_helpers",
"tokio",
"uuid",
"workspace-hack",

View File

@ -1,7 +1,7 @@
//! Combinations of multiple components that together can achieve one goal.
pub(crate) mod throttle_partition;
pub(crate) mod unique_partitions;
pub mod throttle_partition;
pub mod unique_partitions;
#[cfg(test)]
mod tests;

View File

@ -1,15 +1,15 @@
use std::{sync::Arc, time::Duration};
use compactor_scheduler::{MockPartitionsSource, PartitionsSource};
use data_types::{CompactionLevel, PartitionId};
use iox_time::{MockProvider, Time};
use crate::{
Commit, MockCommit, MockPartitionDoneSink, MockPartitionsSource, PartitionDoneSink,
PartitionsSource,
use crate::components::{
combos::{throttle_partition::throttle_partition, unique_partitions::unique_partitions},
commit::{mock::MockCommit, Commit},
partition_done_sink::{mock::MockPartitionDoneSink, PartitionDoneSink},
};
use super::{throttle_partition::throttle_partition, unique_partitions::unique_partitions};
#[tokio::test]
async fn test_unique_and_throttle() {
let inner_source = Arc::new(MockPartitionsSource::new(vec![

View File

@ -8,11 +8,12 @@ use std::{
};
use async_trait::async_trait;
use compactor_scheduler::PartitionsSource;
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
use futures::StreamExt;
use iox_time::{Time, TimeProvider};
use crate::{Commit, PartitionDoneSink, PartitionsSource};
use crate::components::{commit::Commit, partition_done_sink::PartitionDoneSink};
/// Ensures that partitions that do not receive any commits are throttled.
///
@ -53,8 +54,8 @@ use crate::{Commit, PartitionDoneSink, PartitionsSource};
/// concurrency of this bypass can be controlled via `bypass_concurrency`.
///
/// This setup relies on a fact that it does not process duplicate [`PartitionId`]. You may use
/// [`unique_partitions`](super::unique_partitions::unique_partitions) to achieve that.
pub(crate) fn throttle_partition<T1, T2, T3>(
/// [`unique_partitions`](crate::components::combos::unique_partitions::unique_partitions) to achieve that.
pub fn throttle_partition<T1, T2, T3>(
source: T1,
commit: T2,
sink: T3,
@ -106,7 +107,7 @@ struct State {
type SharedState = Arc<Mutex<State>>;
#[derive(Debug)]
pub(crate) struct ThrottlePartitionsSourceWrapper<T1, T2>
pub struct ThrottlePartitionsSourceWrapper<T1, T2>
where
T1: PartitionsSource,
T2: PartitionDoneSink,
@ -187,7 +188,7 @@ where
}
#[derive(Debug)]
pub(crate) struct ThrottleCommitWrapper<T>
pub struct ThrottleCommitWrapper<T>
where
T: Commit,
{
@ -240,7 +241,7 @@ where
}
#[derive(Debug)]
pub(crate) struct ThrottlePartitionDoneSinkWrapper<T>
pub struct ThrottlePartitionDoneSinkWrapper<T>
where
T: PartitionDoneSink,
{
@ -295,10 +296,12 @@ where
#[cfg(test)]
mod tests {
use compactor_scheduler::MockPartitionsSource;
use iox_time::MockProvider;
use crate::{
commit::mock::CommitHistoryEntry, MockCommit, MockPartitionDoneSink, MockPartitionsSource,
use crate::components::{
commit::mock::{CommitHistoryEntry, MockCommit},
partition_done_sink::mock::MockPartitionDoneSink,
};
use super::*;

View File

@ -7,10 +7,11 @@ use std::{
};
use async_trait::async_trait;
use compactor_scheduler::PartitionsSource;
use data_types::PartitionId;
use futures::StreamExt;
use crate::{PartitionDoneSink, PartitionsSource};
use crate::components::partition_done_sink::PartitionDoneSink;
/// Ensures that a unique set of partitions is flowing through the critical section of the compactor pipeline.
///
@ -31,7 +32,7 @@ use crate::{PartitionDoneSink, PartitionsSource};
///
/// | Step | Name | Type | Description |
/// | ---- | --------------------- | ----------------------------------------------------------- | ----------- |
/// | 1 | **Actual source** | `inner_source`/`T1`/[`PartitionsSource`], wrapped | This is the actual source, e.g. a [schedule](crate::PartitionsSource) |
/// | 1 | **Actual source** | `inner_source`/`T1`/[`PartitionsSource`], wrapped | This is the actual source, e.g. a [schedule](crate::components::partitions_source::scheduled::ScheduledPartitionsSource) |
/// | 2 | **Unique IDs source** | [`UniquePartionsSourceWrapper`], wraps `inner_source`/`T1` | Outputs that [`PartitionId`]s from the `inner_source` but filters out partitions that have not yet reached the uniqueness sink (step 4) |
/// | 3 | **Critical section** | -- | Here it is always ensured that a single [`PartitionId`] does NOT occur more than once. |
/// | 4 | **Unique IDs sink** | [`UniquePartitionDoneSinkWrapper`], wraps `inner_sink`/`T2` | Observes incoming IDs and removes them from the filter applied in step 2. |
@ -40,7 +41,7 @@ use crate::{PartitionDoneSink, PartitionsSource};
/// Note that partitions filtered out by [`UniquePartionsSourceWrapper`] will directly be forwarded to `inner_sink`. No
/// partition is ever lost. This means that `inner_source` and `inner_sink` can perform proper accounting. The
/// concurrency of this bypass can be controlled via `bypass_concurrency`.
pub(crate) fn unique_partitions<T1, T2>(
pub fn unique_partitions<T1, T2>(
inner_source: T1,
inner_sink: T2,
bypass_concurrency: usize,
@ -70,7 +71,7 @@ where
type InFlight = Arc<Mutex<HashSet<PartitionId>>>;
#[derive(Debug)]
pub(crate) struct UniquePartionsSourceWrapper<T1, T2>
pub struct UniquePartionsSourceWrapper<T1, T2>
where
T1: PartitionsSource,
T2: PartitionDoneSink,
@ -127,7 +128,7 @@ where
}
#[derive(Debug)]
pub(crate) struct UniquePartitionDoneSinkWrapper<T>
pub struct UniquePartitionDoneSinkWrapper<T>
where
T: PartitionDoneSink,
{
@ -179,7 +180,9 @@ where
mod tests {
use std::collections::HashMap;
use crate::{MockPartitionDoneSink, MockPartitionsSource};
use compactor_scheduler::MockPartitionsSource;
use crate::components::partition_done_sink::mock::MockPartitionDoneSink;
use super::*;

View File

@ -1,55 +0,0 @@
use std::sync::Arc;
use async_trait::async_trait;
use compactor_scheduler::{
Commit, CommitUpdate, CompactionJob, CompactionJobStatus, CompactionJobStatusResult,
CompactionJobStatusVariant, Scheduler,
};
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
#[derive(Debug)]
pub(crate) struct CommitToScheduler {
scheduler: Arc<dyn Scheduler>,
}
impl CommitToScheduler {
pub(crate) fn new(scheduler: Arc<dyn Scheduler>) -> Self {
Self { scheduler }
}
}
#[async_trait]
impl Commit for CommitToScheduler {
async fn commit(
&self,
partition_id: PartitionId,
delete: &[ParquetFile],
upgrade: &[ParquetFile],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Vec<ParquetFileId> {
match self
.scheduler
.job_status(CompactionJobStatus {
job: CompactionJob::new(partition_id),
status: CompactionJobStatusVariant::Update(CommitUpdate::new(
partition_id,
delete.into(),
upgrade.into(),
create.into(),
target_level,
)),
})
.await
{
Ok(CompactionJobStatusResult::UpdatedParquetFiles(ids)) => ids,
_ => panic!("commit failed"),
}
}
}
impl std::fmt::Display for CommitToScheduler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "CommitToScheduler")
}
}

View File

@ -5,16 +5,16 @@ use backoff::{Backoff, BackoffConfig};
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
use iox_catalog::interface::Catalog;
use crate::Commit;
use super::Commit;
#[derive(Debug)]
pub(crate) struct CatalogCommit {
pub struct CatalogCommit {
backoff_config: BackoffConfig,
catalog: Arc<dyn Catalog>,
}
impl CatalogCommit {
pub(crate) fn new(backoff_config: BackoffConfig, catalog: Arc<dyn Catalog>) -> Self {
pub fn new(backoff_config: BackoffConfig, catalog: Arc<dyn Catalog>) -> Self {
Self {
backoff_config,
catalog,

View File

@ -7,7 +7,7 @@ use observability_deps::tracing::info;
use super::Commit;
#[derive(Debug)]
pub(crate) struct LoggingCommitWrapper<T>
pub struct LoggingCommitWrapper<T>
where
T: Commit,
{
@ -18,7 +18,7 @@ impl<T> LoggingCommitWrapper<T>
where
T: Commit,
{
pub(crate) fn new(inner: T) -> Self {
pub fn new(inner: T) -> Self {
Self { inner }
}
}
@ -83,7 +83,7 @@ mod tests {
use test_helpers::tracing::TracingCapture;
use super::*;
use crate::commit::mock::{CommitHistoryEntry, MockCommit};
use crate::components::commit::mock::{CommitHistoryEntry, MockCommit};
use iox_tests::ParquetFileBuilder;
#[test]

View File

@ -102,7 +102,7 @@ impl Histogram {
}
#[derive(Debug)]
pub(crate) struct MetricsCommitWrapper<T>
pub struct MetricsCommitWrapper<T>
where
T: Commit,
{
@ -124,7 +124,7 @@ impl<T> MetricsCommitWrapper<T>
where
T: Commit,
{
pub(crate) fn new(inner: T, registry: &Registry) -> Self {
pub fn new(inner: T, registry: &Registry) -> Self {
Self {
file_bytes: Histogram::new(
registry,
@ -307,7 +307,7 @@ mod tests {
use metric::{assert_histogram, Attributes};
use crate::commit::mock::{CommitHistoryEntry, MockCommit};
use crate::components::commit::mock::{CommitHistoryEntry, MockCommit};
use iox_tests::ParquetFileBuilder;
use super::*;

View File

@ -12,23 +12,23 @@ use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams,
use super::Commit;
#[derive(Debug, PartialEq, Eq, Clone)]
pub(crate) struct CommitHistoryEntry {
pub(crate) partition_id: PartitionId,
pub(crate) delete: Vec<ParquetFile>,
pub(crate) upgrade: Vec<ParquetFile>,
pub(crate) created: Vec<ParquetFile>,
pub(crate) target_level: CompactionLevel,
pub struct CommitHistoryEntry {
pub partition_id: PartitionId,
pub delete: Vec<ParquetFile>,
pub upgrade: Vec<ParquetFile>,
pub created: Vec<ParquetFile>,
pub target_level: CompactionLevel,
}
#[derive(Debug, Default)]
pub(crate) struct MockCommit {
#[derive(Debug)]
pub struct MockCommit {
history: Mutex<Vec<CommitHistoryEntry>>,
id_counter: AtomicI64,
}
impl MockCommit {
#[allow(dead_code)] // not used anywhere
pub(crate) fn new() -> Self {
pub fn new() -> Self {
Self {
history: Default::default(),
id_counter: AtomicI64::new(1000),
@ -36,7 +36,7 @@ impl MockCommit {
}
#[allow(dead_code)] // not used anywhere
pub(crate) fn history(&self) -> Vec<CommitHistoryEntry> {
pub fn history(&self) -> Vec<CommitHistoryEntry> {
self.history.lock().expect("not poisoned").clone()
}
}

View File

@ -6,9 +6,10 @@ use std::{
use async_trait::async_trait;
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
pub(crate) mod logging;
pub(crate) mod metrics;
pub(crate) mod mock;
pub mod catalog;
pub mod logging;
pub mod metrics;
pub mod mock;
/// Ensures that the file change (i.e. deletion and creation) are committed to the catalog.
#[async_trait]

View File

@ -4,9 +4,7 @@
use std::{sync::Arc, time::Duration};
use compactor_scheduler::{
create_scheduler, Commit, PartitionDoneSink, PartitionsSource, Scheduler,
};
use compactor_scheduler::{create_scheduler, PartitionsSource, Scheduler};
use data_types::CompactionLevel;
use object_store::memory::InMemory;
@ -14,7 +12,11 @@ use crate::{config::Config, error::ErrorKind, object_store::ignore_writes::Ignor
use super::{
changed_files_filter::logging::LoggingChangedFiles,
commit::CommitToScheduler,
combos::{throttle_partition::throttle_partition, unique_partitions::unique_partitions},
commit::{
catalog::CatalogCommit, logging::LoggingCommitWrapper, metrics::MetricsCommitWrapper,
mock::MockCommit, Commit,
},
df_plan_exec::{
dedicated::DedicatedDataFusionPlanExec, noop::NoopDataFusionPlanExec, DataFusionPlanExec,
},
@ -37,8 +39,9 @@ use super::{
},
parquet_files_sink::{dispatch::DispatchParquetFilesSink, ParquetFilesSink},
partition_done_sink::{
error_kind::ErrorKindPartitionDoneSinkWrapper, logging::LoggingPartitionDoneSinkWrapper,
metrics::MetricsPartitionDoneSinkWrapper, outcome::PartitionDoneSinkToScheduler,
catalog::CatalogPartitionDoneSink, error_kind::ErrorKindPartitionDoneSinkWrapper,
logging::LoggingPartitionDoneSinkWrapper, metrics::MetricsPartitionDoneSinkWrapper,
mock::MockPartitionDoneSink, PartitionDoneSink,
},
partition_files_source::{
catalog::{CatalogPartitionFilesSource, QueryRateLimiter},
@ -90,8 +93,6 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
config.scheduler_config.clone(),
Arc::clone(&config.catalog),
Arc::clone(&config.time_provider),
Arc::clone(&config.metric_registry),
config.shadow_mode,
);
let (partitions_source, commit, partition_done_sink) =
make_partitions_source_commit_partition_sink(config, Arc::clone(&scheduler));
@ -125,14 +126,49 @@ fn make_partitions_source_commit_partition_sink(
Arc<dyn Commit>,
Arc<dyn PartitionDoneSink>,
) {
let partitions_source = ScheduledPartitionsSource::new(Arc::clone(&scheduler));
let partitions_source = ScheduledPartitionsSource::new(scheduler);
let commit = CommitToScheduler::new(Arc::clone(&scheduler));
let partition_done_sink: Arc<dyn PartitionDoneSink> = if config.shadow_mode {
Arc::new(MockPartitionDoneSink::new())
} else {
Arc::new(CatalogPartitionDoneSink::new(
config.backoff_config.clone(),
Arc::clone(&config.catalog),
))
};
let partition_done_sink = PartitionDoneSinkToScheduler::new(Arc::clone(&scheduler));
let commit: Arc<dyn Commit> = if config.shadow_mode {
Arc::new(MockCommit::new())
} else {
Arc::new(CatalogCommit::new(
config.backoff_config.clone(),
Arc::clone(&config.catalog),
))
};
let commit = if let Some(commit_wrapper) = config.commit_wrapper.as_ref() {
commit_wrapper.wrap(commit)
} else {
commit
};
let (partitions_source, partition_done_sink) =
unique_partitions(partitions_source, partition_done_sink, 1);
let (partitions_source, commit, partition_done_sink) = throttle_partition(
partitions_source,
commit,
partition_done_sink,
Arc::clone(&config.time_provider),
Duration::from_secs(60),
1,
);
let commit = Arc::new(LoggingCommitWrapper::new(MetricsCommitWrapper::new(
commit,
&config.metric_registry,
)));
// compactors are responsible for error classification
// and any future decisions regarding graceful shutdown
let partition_done_sink: Arc<dyn PartitionDoneSink> = if config.all_errors_are_fatal {
Arc::new(partition_done_sink)
} else {
@ -149,7 +185,6 @@ fn make_partitions_source_commit_partition_sink(
})
.copied()
.collect(),
scheduler,
))
};
let partition_done_sink = Arc::new(LoggingPartitionDoneSinkWrapper::new(
@ -175,7 +210,7 @@ fn make_partitions_source_commit_partition_sink(
))
};
(partitions_source, Arc::new(commit), partition_done_sink)
(partitions_source, commit, partition_done_sink)
}
fn make_partition_stream(

View File

@ -1,19 +1,19 @@
use std::sync::Arc;
use compactor_scheduler::{Commit, PartitionDoneSink};
use self::{
changed_files_filter::ChangedFilesFilter, df_plan_exec::DataFusionPlanExec,
changed_files_filter::ChangedFilesFilter, commit::Commit, df_plan_exec::DataFusionPlanExec,
df_planner::DataFusionPlanner, divide_initial::DivideInitial, file_classifier::FileClassifier,
ir_planner::IRPlanner, parquet_files_sink::ParquetFilesSink,
partition_files_source::PartitionFilesSource, partition_filter::PartitionFilter,
partition_info_source::PartitionInfoSource, partition_stream::PartitionStream,
partition_done_sink::PartitionDoneSink, partition_files_source::PartitionFilesSource,
partition_filter::PartitionFilter, partition_info_source::PartitionInfoSource,
partition_stream::PartitionStream,
post_classification_partition_filter::PostClassificationPartitionFilter,
round_info_source::RoundInfoSource, round_split::RoundSplit, scratchpad::ScratchpadGen,
};
pub mod changed_files_filter;
pub(crate) mod commit;
pub mod combos;
pub mod commit;
pub mod df_plan_exec;
pub mod df_planner;
pub mod divide_initial;
@ -61,7 +61,7 @@ pub struct Components {
pub post_classification_partition_filter: Arc<dyn PostClassificationPartitionFilter>,
/// Records "partition is done" status for given partition.
pub partition_done_sink: Arc<dyn PartitionDoneSink>,
/// Commits changes (i.e. deletion and creation).
/// Commits changes (i.e. deletion and creation) to the catalog.
pub commit: Arc<dyn Commit>,
/// Creates `PlanIR` that describes what files should be compacted and updated
pub ir_planner: Arc<dyn IRPlanner>,

View File

@ -5,16 +5,18 @@ use backoff::{Backoff, BackoffConfig};
use data_types::PartitionId;
use iox_catalog::interface::Catalog;
use super::{DynError, PartitionDoneSink};
use crate::error::DynError;
use super::PartitionDoneSink;
#[derive(Debug)]
pub(crate) struct CatalogPartitionDoneSink {
pub struct CatalogPartitionDoneSink {
backoff_config: BackoffConfig,
catalog: Arc<dyn Catalog>,
}
impl CatalogPartitionDoneSink {
pub(crate) fn new(backoff_config: BackoffConfig, catalog: Arc<dyn Catalog>) -> Self {
pub fn new(backoff_config: BackoffConfig, catalog: Arc<dyn Catalog>) -> Self {
Self {
backoff_config,
catalog,

View File

@ -1,14 +1,12 @@
use std::{collections::HashSet, fmt::Display, sync::Arc};
use std::{collections::HashSet, fmt::Display};
use async_trait::async_trait;
use compactor_scheduler::{
CompactionJob, CompactionJobStatus, CompactionJobStatusResult, CompactionJobStatusVariant,
ErrorKind as SchedulerErrorKind, PartitionDoneSink, Scheduler,
};
use data_types::PartitionId;
use crate::error::{DynError, ErrorKind, ErrorKindExt};
use super::PartitionDoneSink;
#[derive(Debug)]
pub struct ErrorKindPartitionDoneSinkWrapper<T>
where
@ -16,19 +14,14 @@ where
{
kind: HashSet<ErrorKind>,
inner: T,
scheduler: Arc<dyn Scheduler>,
}
impl<T> ErrorKindPartitionDoneSinkWrapper<T>
where
T: PartitionDoneSink,
{
pub fn new(inner: T, kind: HashSet<ErrorKind>, scheduler: Arc<dyn Scheduler>) -> Self {
Self {
kind,
inner,
scheduler,
}
pub fn new(inner: T, kind: HashSet<ErrorKind>) -> Self {
Self { kind, inner }
}
}
@ -52,24 +45,6 @@ where
match res {
Ok(()) => self.inner.record(partition, Ok(())).await,
Err(e) if self.kind.contains(&e.classify()) => {
let scheduler_error = match SchedulerErrorKind::from(e.classify()) {
SchedulerErrorKind::OutOfMemory => SchedulerErrorKind::OutOfMemory,
SchedulerErrorKind::ObjectStore => SchedulerErrorKind::ObjectStore,
SchedulerErrorKind::Timeout => SchedulerErrorKind::Timeout,
SchedulerErrorKind::Unknown(_) => SchedulerErrorKind::Unknown(e.to_string()),
};
match self
.scheduler
.job_status(CompactionJobStatus {
job: CompactionJob::new(partition),
status: CompactionJobStatusVariant::Error(scheduler_error),
})
.await
{
Ok(CompactionJobStatusResult::Ack) => {}
_ => panic!("unexpected result from scheduler"),
}
self.inner.record(partition, Err(e)).await;
}
_ => {}
@ -81,10 +56,9 @@ where
mod tests {
use std::{collections::HashMap, sync::Arc};
use compactor_scheduler::{create_test_scheduler, MockPartitionDoneSink};
use crate::components::partition_done_sink::mock::MockPartitionDoneSink;
use datafusion::error::DataFusionError;
use iox_tests::TestCatalog;
use iox_time::{MockProvider, Time};
use object_store::Error as ObjectStoreError;
use super::*;
@ -94,11 +68,6 @@ mod tests {
let sink = ErrorKindPartitionDoneSinkWrapper::new(
MockPartitionDoneSink::new(),
HashSet::from([ErrorKind::ObjectStore, ErrorKind::OutOfMemory]),
create_test_scheduler(
TestCatalog::new().catalog(),
Arc::new(MockProvider::new(Time::MIN)),
None,
),
);
assert_eq!(sink.to_string(), "kind([ObjectStore, OutOfMemory], mock)");
}
@ -109,11 +78,6 @@ mod tests {
let sink = ErrorKindPartitionDoneSinkWrapper::new(
Arc::clone(&inner),
HashSet::from([ErrorKind::ObjectStore, ErrorKind::OutOfMemory]),
create_test_scheduler(
TestCatalog::new().catalog(),
Arc::new(MockProvider::new(Time::MIN)),
None,
),
);
sink.record(

View File

@ -1,12 +1,13 @@
use std::fmt::Display;
use async_trait::async_trait;
use compactor_scheduler::PartitionDoneSink;
use data_types::PartitionId;
use observability_deps::tracing::{error, info};
use crate::error::{DynError, ErrorKindExt};
use super::PartitionDoneSink;
#[derive(Debug)]
pub struct LoggingPartitionDoneSinkWrapper<T>
where
@ -60,10 +61,11 @@ where
mod tests {
use std::{collections::HashMap, sync::Arc};
use compactor_scheduler::MockPartitionDoneSink;
use object_store::Error as ObjectStoreError;
use test_helpers::tracing::TracingCapture;
use crate::components::partition_done_sink::mock::MockPartitionDoneSink;
use super::*;
#[test]

View File

@ -1,12 +1,13 @@
use std::{collections::HashMap, fmt::Display};
use async_trait::async_trait;
use compactor_scheduler::PartitionDoneSink;
use data_types::PartitionId;
use metric::{Registry, U64Counter};
use crate::error::{DynError, ErrorKind, ErrorKindExt};
use super::PartitionDoneSink;
const METRIC_NAME_PARTITION_COMPLETE_COUNT: &str = "iox_compactor_partition_complete_count";
#[derive(Debug)]
@ -82,10 +83,11 @@ where
mod tests {
use std::{collections::HashMap, sync::Arc};
use compactor_scheduler::MockPartitionDoneSink;
use metric::{assert_counter, Attributes};
use object_store::Error as ObjectStoreError;
use crate::components::partition_done_sink::mock::MockPartitionDoneSink;
use super::*;
#[test]

View File

@ -3,21 +3,21 @@ use std::{collections::HashMap, fmt::Display, sync::Mutex};
use async_trait::async_trait;
use data_types::PartitionId;
use super::{DynError, PartitionDoneSink};
use crate::error::DynError;
use super::PartitionDoneSink;
/// Mock for [`PartitionDoneSink`].
#[derive(Debug, Default)]
pub struct MockPartitionDoneSink {
last: Mutex<HashMap<PartitionId, Result<(), String>>>,
}
impl MockPartitionDoneSink {
/// Create new mock.
pub fn new() -> Self {
Self::default()
}
/// Get the last recorded results.
#[allow(dead_code)] // not used anywhere
pub fn results(&self) -> HashMap<PartitionId, Result<(), String>> {
self.last.lock().expect("not poisoned").clone()
}

View File

@ -1,4 +1,34 @@
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use async_trait::async_trait;
use data_types::PartitionId;
use crate::error::DynError;
pub mod catalog;
pub mod error_kind;
pub mod logging;
pub mod metrics;
pub mod outcome;
pub mod mock;
/// Records "partition is done" status for given partition.
#[async_trait]
pub trait PartitionDoneSink: Debug + Display + Send + Sync {
/// Record "partition is done" status for given partition.
///
/// This method should retry.
async fn record(&self, partition: PartitionId, res: Result<(), DynError>);
}
#[async_trait]
impl<T> PartitionDoneSink for Arc<T>
where
T: PartitionDoneSink + ?Sized,
{
async fn record(&self, partition: PartitionId, res: Result<(), DynError>) {
self.as_ref().record(partition, res).await
}
}

View File

@ -1,49 +0,0 @@
use std::{fmt::Display, sync::Arc};
use async_trait::async_trait;
use compactor_scheduler::{
CompactionJob, CompactionJobStatus, CompactionJobStatusResult, CompactionJobStatusVariant,
PartitionDoneSink, Scheduler, SkipReason,
};
use data_types::PartitionId;
use crate::DynError;
#[derive(Debug)]
pub struct PartitionDoneSinkToScheduler {
scheduler: Arc<dyn Scheduler>,
}
impl PartitionDoneSinkToScheduler {
pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
Self { scheduler }
}
}
impl Display for PartitionDoneSinkToScheduler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PartitionDoneSinkToScheduler")
}
}
#[async_trait]
impl PartitionDoneSink for PartitionDoneSinkToScheduler {
async fn record(&self, partition: PartitionId, res: Result<(), DynError>) {
let mut job_status = CompactionJobStatus {
job: CompactionJob::new(partition),
status: CompactionJobStatusVariant::Complete,
};
if let Err(e) = res {
job_status = CompactionJobStatus {
job: CompactionJob::new(partition),
status: CompactionJobStatusVariant::RequestToSkip(SkipReason::CompactionError(
e.to_string(),
)),
};
};
match self.scheduler.job_status(job_status).await {
Ok(CompactionJobStatusResult::Ack) => {}
_ => panic!("unexpected result from scheduler"),
}
}
}

View File

@ -32,6 +32,7 @@ pub fn log_config(config: &Config) {
min_num_l1_files_to_compact,
process_once,
parquet_files_sink_override,
commit_wrapper,
simulate_without_object_store,
all_errors_are_fatal,
max_num_columns_per_table,
@ -44,6 +45,8 @@ pub fn log_config(config: &Config) {
.map(|_| "Some")
.unwrap_or("None");
let commit_wrapper = commit_wrapper.as_ref().map(|_| "Some").unwrap_or("None");
info!(
%catalog,
%scheduler_config,
@ -66,6 +69,7 @@ pub fn log_config(config: &Config) {
process_once,
simulate_without_object_store,
%parquet_files_sink_override,
%commit_wrapper,
all_errors_are_fatal,
max_num_columns_per_table,
max_num_files_per_plan,

View File

@ -8,7 +8,7 @@ use iox_query::exec::Executor;
use iox_time::TimeProvider;
use parquet_file::storage::ParquetStorage;
use crate::components::parquet_files_sink::ParquetFilesSink;
use crate::components::{commit::CommitWrapper, parquet_files_sink::ParquetFilesSink};
/// Multiple from `max_desired_file_size_bytes` to compute the minimum value for
/// `max_compact_size_bytes`. Since `max_desired_file_size_bytes` is softly enforced, actual file
@ -119,6 +119,11 @@ pub struct Config {
/// (used for testing)
pub parquet_files_sink_override: Option<Arc<dyn ParquetFilesSink>>,
/// Optionally wrap the `Commit` instance
///
/// This is mostly used for testing
pub commit_wrapper: Option<Arc<dyn CommitWrapper>>,
/// Ensure that ALL errors (including object store errors) result in "skipped" partitions.
///
/// This is mostly useful for testing.

View File

@ -1,6 +1,5 @@
//! Error handling.
use compactor_scheduler::ErrorKind as SchedulerErrorKind;
use datafusion::{arrow::error::ArrowError, error::DataFusionError, parquet::errors::ParquetError};
use object_store::Error as ObjectStoreError;
use std::{error::Error, fmt::Display, sync::Arc};
@ -52,17 +51,6 @@ impl ErrorKind {
}
}
impl From<ErrorKind> for SchedulerErrorKind {
fn from(e: ErrorKind) -> Self {
match e {
ErrorKind::ObjectStore => Self::ObjectStore,
ErrorKind::OutOfMemory => Self::OutOfMemory,
ErrorKind::Timeout => Self::Timeout,
ErrorKind::Unknown => Self::Unknown("".into()),
}
}
}
impl Display for ErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())

View File

@ -221,8 +221,12 @@ mod round_info;
// publically expose items needed for testing
pub use components::{
df_planner::panic::PanicDataFusionPlanner, hardcoded::hardcoded_components,
namespaces_source::mock::NamespaceWrapper, parquet_files_sink::ParquetFilesSink, Components,
commit::{Commit, CommitWrapper},
df_planner::panic::PanicDataFusionPlanner,
hardcoded::hardcoded_components,
namespaces_source::mock::NamespaceWrapper,
parquet_files_sink::ParquetFilesSink,
Components,
};
pub use driver::compact;
pub use error::DynError;

View File

@ -9,11 +9,8 @@ license.workspace = true
async-trait = "0.1.71"
backoff = { path = "../backoff" }
data_types = { path = "../data_types" }
futures = "0.3"
iox_catalog = { path = "../iox_catalog" }
iox_time = { path = "../iox_time" }
itertools = "0.11.0"
metric = { path = "../metric" }
observability_deps = { path = "../observability_deps" }
sharder = { path = "../sharder" }
uuid = { version = "1", features = ["v4"] }
@ -21,5 +18,4 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]
iox_tests = { path = "../iox_tests" }
test_helpers = { path = "../test_helpers"}
tokio = { version = "1.29", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }

View File

@ -1,37 +0,0 @@
//! Error classification.
/// What kind of error did we occur during compaction?
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum ErrorKind {
/// Could not access the object store.
ObjectStore,
/// We ran out of memory (OOM).
OutOfMemory,
/// Partition took too long.
Timeout,
/// Unknown/unexpected error.
///
/// This will likely mark the affected partition as "skipped" and the compactor will no longer touch it.
Unknown(String),
}
impl ErrorKind {
/// Return static name.
pub fn name(&self) -> &'static str {
match self {
Self::ObjectStore => "object_store",
Self::OutOfMemory => "out_of_memory",
Self::Timeout => "timeout",
Self::Unknown(_) => "unknown",
}
}
}
impl std::fmt::Display for ErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())
}
}

View File

@ -26,26 +26,16 @@ use iox_time::TimeProvider;
// Workaround for "unused crate" lint false positives.
use workspace_hack as _;
pub(crate) mod commit;
pub(crate) use commit::mock::MockCommit;
pub use commit::{Commit, CommitWrapper};
mod error;
pub use error::ErrorKind;
mod local_scheduler;
pub(crate) use local_scheduler::{id_only_partition_filter::IdOnlyPartitionFilter, LocalScheduler};
// configurations used externally during scheduler setup
pub use local_scheduler::{
partition_done_sink::{mock::MockPartitionDoneSink, PartitionDoneSink},
partitions_source_config::PartitionsSourceConfig,
shard_config::ShardConfig,
partitions_source_config::PartitionsSourceConfig, shard_config::ShardConfig,
LocalSchedulerConfig,
};
// partitions_source trait
mod partitions_source;
pub use partitions_source::*;
// scheduler trait and associated types
mod scheduler;
pub use scheduler::*;
@ -59,8 +49,6 @@ pub fn create_scheduler(
config: SchedulerConfig,
catalog: Arc<dyn Catalog>,
time_provider: Arc<dyn TimeProvider>,
metrics: Arc<metric::Registry>,
shadow_mode: bool,
) -> Arc<dyn Scheduler> {
match config {
SchedulerConfig::Local(scheduler_config) => {
@ -69,8 +57,6 @@ pub fn create_scheduler(
BackoffConfig::default(),
catalog,
time_provider,
metrics,
shadow_mode,
);
Arc::new(scheduler)
}
@ -89,20 +75,13 @@ pub fn create_test_scheduler(
let scheduler_config = match mocked_partition_ids {
None => SchedulerConfig::default(),
Some(partition_ids) => SchedulerConfig::Local(LocalSchedulerConfig {
commit_wrapper: None,
partitions_source_config: PartitionsSourceConfig::Fixed(
partition_ids.into_iter().collect::<HashSet<PartitionId>>(),
),
shard_config: None,
}),
};
create_scheduler(
scheduler_config,
catalog,
time_provider,
Arc::new(metric::Registry::default()),
false,
)
create_scheduler(scheduler_config, catalog, time_provider)
}
#[cfg(test)]
@ -132,7 +111,7 @@ mod tests {
}
#[tokio::test]
async fn test_test_scheduler_with_mocked_partition_ids() {
async fn test_test_scheduler_with_mocked_parition_ids() {
let partitions = vec![PartitionId::new(0), PartitionId::new(1234242)];
let scheduler = create_test_scheduler(

View File

@ -1,36 +1,26 @@
//! Internals used by [`LocalScheduler`].
pub(crate) mod catalog_commit;
pub(crate) mod combos;
pub(crate) mod id_only_partition_filter;
pub(crate) mod partition_done_sink;
pub(crate) mod partitions_source;
pub(crate) mod partitions_source_config;
pub(crate) mod shard_config;
use std::{sync::Arc, time::Duration};
use std::sync::Arc;
use async_trait::async_trait;
use backoff::BackoffConfig;
use iox_catalog::interface::Catalog;
use iox_time::TimeProvider;
use observability_deps::tracing::{info, warn};
use observability_deps::tracing::info;
use crate::{
commit::{logging::LoggingCommitWrapper, metrics::MetricsCommitWrapper},
Commit, CommitUpdate, CommitWrapper, CompactionJob, CompactionJobStatus,
CompactionJobStatusResult, CompactionJobStatusVariant, MockCommit, MockPartitionsSource,
PartitionsSource, PartitionsSourceConfig, Scheduler, ShardConfig, SkipReason,
CompactionJob, MockPartitionsSource, PartitionsSource, PartitionsSourceConfig, Scheduler,
ShardConfig,
};
use self::{
catalog_commit::CatalogCommit,
combos::{throttle_partition::throttle_partition, unique_partitions::unique_partitions},
id_only_partition_filter::{
and::AndIdOnlyPartitionFilter, shard::ShardPartitionFilter, IdOnlyPartitionFilter,
},
partition_done_sink::{
catalog::CatalogPartitionDoneSink, mock::MockPartitionDoneSink, PartitionDoneSink,
},
partitions_source::{
catalog_all::CatalogAllPartitionsSource,
catalog_to_compact::CatalogToCompactPartitionsSource,
@ -41,10 +31,6 @@ use self::{
/// Configuration specific to the local scheduler.
#[derive(Debug, Default, Clone)]
pub struct LocalSchedulerConfig {
/// Optionally wrap the `Commit` instance
///
/// This is mostly used for testing
pub commit_wrapper: Option<Arc<dyn CommitWrapper>>,
/// The partitions source config used by the local sceduler.
pub partitions_source_config: PartitionsSourceConfig,
/// The shard config used by the local sceduler.
@ -54,14 +40,8 @@ pub struct LocalSchedulerConfig {
/// Implementation of the scheduler for local (per compactor) scheduling.
#[derive(Debug)]
pub(crate) struct LocalScheduler {
/// Commits changes (i.e. deletion and creation) to the catalog
pub(crate) commit: Arc<dyn Commit>,
/// The partitions source to use for scheduling.
partitions_source: Arc<dyn PartitionsSource>,
/// The actions to take when a partition is done.
///
/// Includes partition (PartitionId) tracking of uniqueness and throttling.
partition_done_sink: Arc<dyn PartitionDoneSink>,
/// The shard config used for generating the PartitionsSource.
shard_config: Option<ShardConfig>,
}
@ -73,47 +53,7 @@ impl LocalScheduler {
backoff_config: BackoffConfig,
catalog: Arc<dyn Catalog>,
time_provider: Arc<dyn TimeProvider>,
metrics: Arc<metric::Registry>,
shadow_mode: bool,
) -> Self {
let commit = Self::build_commit(
config.clone(),
backoff_config.clone(),
Arc::clone(&catalog),
metrics,
shadow_mode,
);
let partitions_source = Self::build_partitions_source(
config.clone(),
backoff_config.clone(),
Arc::clone(&catalog),
Arc::clone(&time_provider),
);
let (partitions_source, commit, partition_done_sink) = Self::build_partition_done_sink(
partitions_source,
commit,
backoff_config,
catalog,
time_provider,
shadow_mode,
);
Self {
commit,
partitions_source,
partition_done_sink,
shard_config: config.shard_config,
}
}
fn build_partitions_source(
config: LocalSchedulerConfig,
backoff_config: BackoffConfig,
catalog: Arc<dyn Catalog>,
time_provider: Arc<dyn TimeProvider>,
) -> Arc<dyn PartitionsSource> {
let shard_config = config.shard_config;
let partitions_source: Arc<dyn PartitionsSource> = match &config.partitions_source_config {
PartitionsSourceConfig::CatalogRecentWrites { threshold } => {
@ -146,75 +86,16 @@ impl LocalScheduler {
shard_config.shard_id,
)));
}
Arc::new(FilterPartitionsSourceWrapper::new(
AndIdOnlyPartitionFilter::new(id_only_partition_filters),
let partitions_source: Arc<dyn PartitionsSource> =
Arc::new(FilterPartitionsSourceWrapper::new(
AndIdOnlyPartitionFilter::new(id_only_partition_filters),
partitions_source,
));
Self {
partitions_source,
))
}
fn build_partition_done_sink(
partitions_source: Arc<dyn PartitionsSource>,
commit: Arc<dyn Commit>,
backoff_config: BackoffConfig,
catalog: Arc<dyn Catalog>,
time_provider: Arc<dyn TimeProvider>,
shadow_mode: bool,
) -> (
Arc<dyn PartitionsSource>,
Arc<dyn Commit>,
Arc<dyn PartitionDoneSink>,
) {
let partition_done_sink: Arc<dyn PartitionDoneSink> = if shadow_mode {
Arc::new(MockPartitionDoneSink::new())
} else {
Arc::new(CatalogPartitionDoneSink::new(
backoff_config,
Arc::clone(&catalog),
))
};
let (partitions_source, partition_done_sink) =
unique_partitions(partitions_source, partition_done_sink, 1);
let (partitions_source, commit, partition_done_sink) = throttle_partition(
partitions_source,
commit,
partition_done_sink,
Arc::clone(&time_provider),
Duration::from_secs(60),
1,
);
(
Arc::new(partitions_source),
Arc::new(commit),
Arc::new(partition_done_sink),
)
}
fn build_commit(
config: LocalSchedulerConfig,
backoff_config: BackoffConfig,
catalog: Arc<dyn Catalog>,
metrics_registry: Arc<metric::Registry>,
shadow_mode: bool,
) -> Arc<dyn Commit> {
let commit: Arc<dyn Commit> = if shadow_mode {
Arc::new(MockCommit::new())
} else {
Arc::new(CatalogCommit::new(backoff_config, Arc::clone(&catalog)))
};
let commit = if let Some(commit_wrapper) = &config.commit_wrapper {
commit_wrapper.wrap(commit)
} else {
commit
};
Arc::new(LoggingCommitWrapper::new(MetricsCommitWrapper::new(
commit,
&metrics_registry,
)))
shard_config,
}
}
}
@ -228,48 +109,6 @@ impl Scheduler for LocalScheduler {
.map(CompactionJob::new)
.collect()
}
async fn job_status(
&self,
job_status: CompactionJobStatus,
) -> Result<CompactionJobStatusResult, Box<dyn std::error::Error>> {
match job_status.status {
CompactionJobStatusVariant::Update(commit_update) => {
let CommitUpdate {
partition_id,
delete,
upgrade,
target_level,
create,
} = commit_update;
let result = self
.commit
.commit(partition_id, &delete, &upgrade, &create, target_level)
.await;
// verify create commit counts
assert_eq!(result.len(), create.len());
Ok(CompactionJobStatusResult::UpdatedParquetFiles(result))
}
CompactionJobStatusVariant::RequestToSkip(SkipReason::CompactionError(msg)) => {
self.partition_done_sink
.record(job_status.job.partition_id, Err(msg.into()))
.await;
Ok(CompactionJobStatusResult::Ack)
}
CompactionJobStatusVariant::Error(error_kind) => {
warn!("Error processing job: {:?}: {}", job_status.job, error_kind);
Ok(CompactionJobStatusResult::Ack)
}
CompactionJobStatusVariant::Complete => {
// TODO: once uuid is handled properly, we can track the job completion
Ok(CompactionJobStatusResult::Ack)
}
}
}
}
impl std::fmt::Display for LocalScheduler {
@ -283,10 +122,7 @@ impl std::fmt::Display for LocalScheduler {
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use data_types::{ColumnType, PartitionId};
use iox_tests::{ParquetFileBuilder, TestCatalog, TestParquetFile, TestParquetFileBuilder};
use iox_tests::TestCatalog;
use iox_time::{MockProvider, Time};
use super::*;
@ -298,8 +134,6 @@ mod tests {
BackoffConfig::default(),
TestCatalog::new().catalog(),
Arc::new(MockProvider::new(Time::MIN)),
Arc::new(metric::Registry::default()),
false,
);
assert_eq!(scheduler.to_string(), "local_compaction_scheduler",);
@ -313,7 +147,6 @@ mod tests {
});
let config = LocalSchedulerConfig {
commit_wrapper: None,
partitions_source_config: PartitionsSourceConfig::default(),
shard_config,
};
@ -323,8 +156,6 @@ mod tests {
BackoffConfig::default(),
TestCatalog::new().catalog(),
Arc::new(MockProvider::new(Time::MIN)),
Arc::new(metric::Registry::default()),
false,
);
assert_eq!(
@ -332,245 +163,4 @@ mod tests {
"local_compaction_scheduler(shard_cfg(n_shards=2,shard_id=1))",
);
}
async fn create_scheduler_with_partitions() -> (LocalScheduler, TestParquetFile, TestParquetFile)
{
let catalog = TestCatalog::new();
let ns = catalog.create_namespace_with_retention("ns", None).await;
let table = ns.create_table("table1").await;
table.create_column("time", ColumnType::Time).await;
table.create_column("load", ColumnType::F64).await;
let partition1 = table.create_partition("k").await;
let partition2 = table.create_partition("k").await;
let partition_ids = vec![partition1.partition.id, partition2.partition.id];
// two files on partition1, to be replaced by one compacted file
let file_builder = TestParquetFileBuilder::default().with_line_protocol("table1 load=1 11");
let file1_1 = partition1.create_parquet_file(file_builder.clone()).await;
let file1_2 = partition1.create_parquet_file(file_builder).await;
let config = LocalSchedulerConfig {
commit_wrapper: None,
partitions_source_config: PartitionsSourceConfig::Fixed(
partition_ids.into_iter().collect::<HashSet<PartitionId>>(),
),
shard_config: None,
};
let scheduler = LocalScheduler::new(
config,
BackoffConfig::default(),
catalog.catalog(),
Arc::new(MockProvider::new(Time::MIN)),
Arc::new(metric::Registry::default()),
false,
);
(scheduler, file1_1, file1_2)
}
#[tokio::test]
#[should_panic]
async fn test_status_update_none_should_panic() {
test_helpers::maybe_start_logging();
let (scheduler, _, _) = create_scheduler_with_partitions().await;
let jobs = scheduler.get_jobs().await;
for job in jobs {
let commit_update = CommitUpdate {
partition_id: job.partition_id,
delete: vec![],
upgrade: vec![],
target_level: data_types::CompactionLevel::Final,
create: vec![],
};
let _ = scheduler
.job_status(CompactionJobStatus {
job,
status: CompactionJobStatusVariant::Update(commit_update),
})
.await;
}
}
#[tokio::test]
async fn test_status_update_replacement() {
test_helpers::maybe_start_logging();
let (scheduler, existing_1, existing_2) = create_scheduler_with_partitions().await;
let jobs = scheduler.get_jobs().await;
let job = jobs
.into_iter()
.find(|job| job.partition_id == existing_1.partition.partition.id)
.unwrap();
let created = ParquetFileBuilder::new(1002)
.with_partition(job.partition_id.get())
.build();
let commit_update = CommitUpdate {
partition_id: job.partition_id,
delete: vec![existing_1.into(), existing_2.into()],
upgrade: vec![],
target_level: data_types::CompactionLevel::Final,
create: vec![created.into()],
};
assert!(scheduler
.job_status(CompactionJobStatus {
job,
status: CompactionJobStatusVariant::Update(commit_update),
})
.await
.is_ok());
}
#[tokio::test]
#[should_panic]
async fn test_status_update_replacement_args_incomplete() {
test_helpers::maybe_start_logging();
let (scheduler, _, _) = create_scheduler_with_partitions().await;
let jobs = scheduler.get_jobs().await;
for job in jobs {
let created_1 = ParquetFileBuilder::new(1002)
.with_partition(job.partition_id.get())
.build();
let commit_update = CommitUpdate {
partition_id: job.partition_id,
delete: vec![],
upgrade: vec![],
target_level: data_types::CompactionLevel::Final,
create: vec![created_1.into()],
};
let _ = scheduler
.job_status(CompactionJobStatus {
job,
status: CompactionJobStatusVariant::Update(commit_update),
})
.await;
}
}
#[tokio::test]
async fn test_status_update_upgrade() {
test_helpers::maybe_start_logging();
let (scheduler, existing_1, existing_2) = create_scheduler_with_partitions().await;
let jobs = scheduler.get_jobs().await;
let job = jobs
.into_iter()
.find(|job| job.partition_id == existing_1.partition.partition.id)
.unwrap();
let commit_update = CommitUpdate {
partition_id: job.partition_id,
delete: vec![],
upgrade: vec![existing_1.into(), existing_2.into()],
target_level: data_types::CompactionLevel::Final,
create: vec![],
};
assert!(scheduler
.job_status(CompactionJobStatus {
job,
status: CompactionJobStatusVariant::Update(commit_update),
})
.await
.is_ok());
}
#[tokio::test]
async fn test_status_update_can_replace_and_upgrade_at_once() {
test_helpers::maybe_start_logging();
let (scheduler, existing_1, existing_2) = create_scheduler_with_partitions().await;
let jobs = scheduler.get_jobs().await;
let job = jobs
.into_iter()
.find(|job| job.partition_id == existing_1.partition.partition.id)
.unwrap();
let created = ParquetFileBuilder::new(1002)
.with_partition(job.partition_id.get())
.build();
let commit_update = CommitUpdate {
partition_id: job.partition_id,
delete: vec![existing_1.into()],
upgrade: vec![existing_2.into()],
target_level: data_types::CompactionLevel::Final,
create: vec![created.into()],
};
assert!(scheduler
.job_status(CompactionJobStatus {
job,
status: CompactionJobStatusVariant::Update(commit_update),
})
.await
.is_ok());
}
#[tokio::test]
async fn test_status_skip() {
test_helpers::maybe_start_logging();
let (scheduler, _, _) = create_scheduler_with_partitions().await;
let jobs = scheduler.get_jobs().await;
for job in jobs {
assert!(scheduler
.job_status(CompactionJobStatus {
job,
status: CompactionJobStatusVariant::RequestToSkip(SkipReason::CompactionError(
"some error".into()
)),
})
.await
.is_ok());
}
}
#[tokio::test]
async fn test_status_error() {
test_helpers::maybe_start_logging();
let (scheduler, _, _) = create_scheduler_with_partitions().await;
let jobs = scheduler.get_jobs().await;
for job in jobs {
assert!(scheduler
.job_status(CompactionJobStatus {
job,
status: CompactionJobStatusVariant::Error(crate::ErrorKind::OutOfMemory),
})
.await
.is_ok());
}
}
#[tokio::test]
async fn test_status_complete() {
test_helpers::maybe_start_logging();
let (scheduler, _, _) = create_scheduler_with_partitions().await;
let jobs = scheduler.get_jobs().await;
for job in jobs {
assert!(scheduler
.job_status(CompactionJobStatus {
job,
status: CompactionJobStatusVariant::Complete,
})
.await
.is_ok());
}
}
}

View File

@ -1,32 +0,0 @@
pub(crate) mod catalog;
pub(crate) mod mock;
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use async_trait::async_trait;
use data_types::PartitionId;
/// Dynamic error type that is used throughout the stack.
pub(crate) type DynError = Box<dyn std::error::Error + Send + Sync>;
/// Records "partition is done" status for given partition.
#[async_trait]
pub trait PartitionDoneSink: Debug + Display + Send + Sync {
/// Record "partition is done" status for given partition.
///
/// This method should retry.
async fn record(&self, partition: PartitionId, res: Result<(), DynError>);
}
#[async_trait]
impl<T> PartitionDoneSink for Arc<T>
where
T: PartitionDoneSink + ?Sized,
{
async fn record(&self, partition: PartitionId, res: Result<(), DynError>) {
self.as_ref().record(partition, res).await
}
}

View File

@ -1,13 +1,10 @@
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use std::fmt::{Debug, Display};
use async_trait::async_trait;
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
use data_types::PartitionId;
use uuid::Uuid;
use crate::{CommitWrapper, ErrorKind, LocalSchedulerConfig, PartitionsSourceConfig};
use crate::LocalSchedulerConfig;
/// Scheduler configuration.
#[derive(Debug, Clone)]
@ -16,19 +13,6 @@ pub enum SchedulerConfig {
Local(LocalSchedulerConfig),
}
impl SchedulerConfig {
/// Create new [`LocalScheduler`](crate::LocalScheduler) config with a [`CommitWrapper`].
///
/// This is useful for testing.
pub fn new_local_with_wrapper(commit_wrapper: Arc<dyn CommitWrapper>) -> Self {
Self::Local(LocalSchedulerConfig {
shard_config: None,
partitions_source_config: PartitionsSourceConfig::default(),
commit_wrapper: Some(commit_wrapper),
})
}
}
impl Default for SchedulerConfig {
fn default() -> Self {
Self::Local(LocalSchedulerConfig::default())
@ -39,21 +23,11 @@ impl std::fmt::Display for SchedulerConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SchedulerConfig::Local(LocalSchedulerConfig {
commit_wrapper,
shard_config,
partitions_source_config: _,
}) => match (&shard_config, commit_wrapper) {
(None, None) => write!(f, "local_compaction_scheduler_cfg"),
(Some(shard_config), None) => {
write!(f, "local_compaction_scheduler_cfg({shard_config})",)
}
(Some(shard_config), Some(_)) => write!(
f,
"local_compaction_scheduler_cfg({shard_config},commit_wrapper=Some)",
),
(None, Some(_)) => {
write!(f, "local_compaction_scheduler_cfg(commit_wrapper=Some)",)
}
}) => match &shard_config {
None => write!(f, "local_compaction_scheduler"),
Some(shard_config) => write!(f, "local_compaction_scheduler({shard_config})",),
},
}
}
@ -80,115 +54,9 @@ impl CompactionJob {
}
}
/// Commit update for a given partition.
#[derive(Debug)]
pub struct CommitUpdate {
/// Partition to be updated.
pub(crate) partition_id: PartitionId,
/// Files to be deleted.
pub(crate) delete: Vec<ParquetFile>,
/// Files to be upgraded.
pub(crate) upgrade: Vec<ParquetFile>,
/// Target level for upgraded files.
pub(crate) target_level: CompactionLevel,
/// Files to be created.
pub(crate) create: Vec<ParquetFileParams>,
}
impl CommitUpdate {
/// Create new commit update.
pub fn new(
partition_id: PartitionId,
delete: Vec<ParquetFile>,
upgrade: Vec<ParquetFile>,
create: Vec<ParquetFileParams>,
target_level: CompactionLevel,
) -> Self {
Self {
partition_id,
delete,
upgrade,
target_level,
create,
}
}
}
/// Reason for skipping a partition.
#[derive(Debug)]
pub enum SkipReason {
/// Partition is not compactible, due to an encountered error.
CompactionError(String),
}
/// Status.
#[derive(Debug)]
pub enum CompactionJobStatusVariant {
/// Updates associated with ongoing compaction job.
Update(CommitUpdate),
/// Request to skip partition.
RequestToSkip(SkipReason),
/// Compaction job is complete.
Complete,
/// Compaction job has failed.
Error(ErrorKind),
}
/// Status ([`CompactionJobStatusVariant`]) associated with a [`CompactionJob`].
#[derive(Debug)]
pub struct CompactionJobStatus {
/// Job.
pub job: CompactionJob,
/// Status.
pub status: CompactionJobStatusVariant,
}
/// Status of a compaction job.
#[derive(Debug)]
pub enum CompactionJobStatusResult {
/// Ack only.
Ack,
/// Updates which were processed.
UpdatedParquetFiles(Vec<ParquetFileId>),
}
/// Core trait used for all schedulers.
#[async_trait]
pub trait Scheduler: Send + Sync + Debug + Display {
/// Get partitions to be compacted.
async fn get_jobs(&self) -> Vec<CompactionJob>;
/// Update job status.
async fn job_status(
&self,
job_status: CompactionJobStatus,
) -> Result<CompactionJobStatusResult, Box<dyn std::error::Error>>;
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::Commit;
use super::*;
#[test]
fn test_cfg_display_new_local_with_wrapper() {
#[derive(Debug)]
struct MockCommitWrapper;
impl CommitWrapper for MockCommitWrapper {
fn wrap(&self, commit: Arc<dyn Commit>) -> Arc<dyn Commit> {
commit
}
}
let config = SchedulerConfig::new_local_with_wrapper(Arc::new(MockCommitWrapper));
assert_eq!(
config.to_string(),
"local_compaction_scheduler_cfg(commit_wrapper=Some)"
);
}
}

View File

@ -1,7 +1,7 @@
//! Handles recording commit information to the test run log
use async_trait::async_trait;
use compactor_scheduler::{Commit, CommitWrapper};
use compactor::{Commit, CommitWrapper};
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
use std::{
fmt::{Debug, Display},

View File

@ -121,7 +121,7 @@ impl TestSetupBuilder<false> {
let config = Config {
metric_registry: catalog.metric_registry(),
catalog: catalog.catalog(),
scheduler_config: SchedulerConfig::new_local_with_wrapper(Arc::new(commit_wrapper)),
scheduler_config: SchedulerConfig::default(),
parquet_store_real: catalog.parquet_store.clone(),
parquet_store_scratchpad: ParquetStorage::new(
Arc::new(object_store::memory::InMemory::new()),
@ -144,6 +144,7 @@ impl TestSetupBuilder<false> {
process_once: true,
simulate_without_object_store: false,
parquet_files_sink_override: None,
commit_wrapper: Some(Arc::new(commit_wrapper)),
all_errors_are_fatal: true,
max_num_columns_per_table: 200,
max_num_files_per_plan: 200,

View File

@ -184,6 +184,7 @@ pub async fn create_compactor_server_type(
process_once: compactor_config.process_once,
simulate_without_object_store: false,
parquet_files_sink_override: None,
commit_wrapper: None,
all_errors_are_fatal: false,
max_num_columns_per_table: compactor_config.max_num_columns_per_table,
max_num_files_per_plan: compactor_config.max_num_files_per_plan,

View File

@ -66,7 +66,6 @@ fn convert_shard_config(config: ShardConfigForLocalScheduler) -> Option<ShardCon
pub(crate) fn convert_scheduler_config(config: CompactorSchedulerConfig) -> SchedulerConfig {
match config.compactor_scheduler_type {
CompactorSchedulerType::Local => SchedulerConfig::Local(LocalSchedulerConfig {
commit_wrapper: None,
partitions_source_config: convert_partitions_source_config(
config.partition_source_config,
),