diff --git a/Cargo.lock b/Cargo.lock index 0d8150c3b1..4b51087ddf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1011,7 +1011,6 @@ version = "0.1.0" dependencies = [ "async-trait", "backoff", - "clap_blocks", "data_types", "iox_catalog", "iox_tests", @@ -1019,6 +1018,7 @@ dependencies = [ "observability_deps", "sharder", "tokio", + "uuid", "workspace-hack", ] @@ -3008,6 +3008,7 @@ dependencies = [ "clap_blocks", "compactor", "compactor_scheduler", + "data_types", "hyper", "iox_catalog", "iox_query", diff --git a/clap_blocks/src/compactor.rs b/clap_blocks/src/compactor.rs index 35ce013788..5e125869c6 100644 --- a/clap_blocks/src/compactor.rs +++ b/clap_blocks/src/compactor.rs @@ -2,9 +2,15 @@ use std::num::NonZeroUsize; +use super::compactor_scheduler::CompactorSchedulerConfig; + /// CLI config for compactor -#[derive(Debug, Copy, Clone, clap::Parser)] +#[derive(Debug, Clone, clap::Parser)] pub struct CompactorConfig { + /// Configuration for the compactor scheduler + #[clap(flatten)] + pub compactor_scheduler_config: CompactorSchedulerConfig, + /// Number of partitions that should be compacted in parallel. /// /// This should usually be larger than the compaction job diff --git a/compactor/src/components/hardcoded.rs b/compactor/src/components/hardcoded.rs index 4fbb81c215..ae19407fb7 100644 --- a/compactor/src/components/hardcoded.rs +++ b/compactor/src/components/hardcoded.rs @@ -4,7 +4,7 @@ use std::{sync::Arc, time::Duration}; -use compactor_scheduler::PartitionsSource; +use compactor_scheduler::{create_scheduler, PartitionsSource, Scheduler}; use data_types::CompactionLevel; use object_store::memory::InMemory; @@ -87,8 +87,13 @@ use super::{ /// Get hardcoded components. pub fn hardcoded_components(config: &Config) -> Arc { + let scheduler = create_scheduler( + config.scheduler_config.clone(), + Arc::clone(&config.catalog), + Arc::clone(&config.time_provider), + ); let (partitions_source, commit, partition_done_sink) = - make_partitions_source_commit_partition_sink(config); + make_partitions_source_commit_partition_sink(config, Arc::clone(&scheduler)); Arc::new(Components { partition_stream: make_partition_stream(config, partitions_source), @@ -113,12 +118,13 @@ pub fn hardcoded_components(config: &Config) -> Arc { fn make_partitions_source_commit_partition_sink( config: &Config, + scheduler: Arc, ) -> ( Arc, Arc, Arc, ) { - let partitions_source = ScheduledPartitionsSource::new(config); + let partitions_source = ScheduledPartitionsSource::new(scheduler); let partition_done_sink: Arc = if config.shadow_mode { Arc::new(MockPartitionDoneSink::new()) diff --git a/compactor/src/components/partition_filter/mod.rs b/compactor/src/components/partition_filter/mod.rs index 9610a943a7..e8ed745d6b 100644 --- a/compactor/src/components/partition_filter/mod.rs +++ b/compactor/src/components/partition_filter/mod.rs @@ -19,10 +19,6 @@ pub mod or; /// Filters partition based on ID and Parquet files. /// /// May return an error. In this case, the partition will be marked as "skipped". -/// -/// If you only plan to inspect the ID but not the files and not perform any IO, check -/// [`IdOnlyPartitionFilter`](compactor_scheduler::IdOnlyPartitionFilter) -/// which usually runs earlier in the pipeline and hence is more efficient. #[async_trait] pub trait PartitionFilter: Debug + Display + Send + Sync { /// Return `true` if the compactor should run a compaction on this partition. Return `false` diff --git a/compactor/src/components/partitions_source/scheduled.rs b/compactor/src/components/partitions_source/scheduled.rs index deaa7b8a59..53fe408aaf 100644 --- a/compactor/src/components/partitions_source/scheduled.rs +++ b/compactor/src/components/partitions_source/scheduled.rs @@ -1,29 +1,16 @@ use std::sync::Arc; use async_trait::async_trait; -use compactor_scheduler::{LocalScheduler, PartitionsSource}; +use compactor_scheduler::{CompactionJob, PartitionsSource, Scheduler}; use data_types::PartitionId; -use crate::config::Config; - #[derive(Debug)] pub struct ScheduledPartitionsSource { - scheduler: LocalScheduler, // TODO: followon PR will replace with Arc + scheduler: Arc, } impl ScheduledPartitionsSource { - pub fn new(config: &Config) -> Self { - // TODO: followon PR: - // * will have the Arc provided as a component to the compactor - // * this Scheduler will be created with the below components - let scheduler = LocalScheduler::new( - config.partitions_source.clone(), - config.shard_config.clone(), - config.backoff_config.clone(), - Arc::clone(&config.catalog), - Arc::clone(&config.time_provider), - ); - + pub fn new(scheduler: Arc) -> Self { Self { scheduler } } } @@ -31,7 +18,8 @@ impl ScheduledPartitionsSource { #[async_trait] impl PartitionsSource for ScheduledPartitionsSource { async fn fetch(&self) -> Vec { - self.scheduler.fetch().await + let job: Vec = self.scheduler.get_jobs().await; + job.into_iter().map(|job| job.partition_id).collect() } } @@ -43,10 +31,7 @@ impl std::fmt::Display for ScheduledPartitionsSource { #[cfg(test)] mod tests { - use std::collections::HashSet; - - use backoff::BackoffConfig; - use compactor_scheduler::PartitionsSourceConfig; + use compactor_scheduler::create_test_scheduler; use iox_tests::TestCatalog; use iox_time::{MockProvider, Time}; @@ -54,12 +39,10 @@ mod tests { #[test] fn test_display() { - let scheduler = LocalScheduler::new( - PartitionsSourceConfig::Fixed(HashSet::new()), - None, - BackoffConfig::default(), + let scheduler = create_test_scheduler( TestCatalog::new().catalog(), Arc::new(MockProvider::new(Time::MIN)), + None, ); let source = ScheduledPartitionsSource { scheduler }; diff --git a/compactor/src/components/report.rs b/compactor/src/components/report.rs index 20f6300d46..0137447c45 100644 --- a/compactor/src/components/report.rs +++ b/compactor/src/components/report.rs @@ -1,6 +1,5 @@ //! Report component system state. -use compactor_scheduler::ShardConfig; use observability_deps::tracing::info; use crate::config::Config; @@ -14,6 +13,7 @@ pub fn log_config(config: &Config) { // no need to print the internal state of the registry metric_registry: _, catalog, + scheduler_config, parquet_store_real, parquet_store_scratchpad, exec, @@ -26,11 +26,9 @@ pub fn log_config(config: &Config) { percentage_max_file_size, split_percentage, partition_timeout, - partitions_source, shadow_mode, enable_scratchpad, ignore_partition_skip_marker, - shard_config, min_num_l1_files_to_compact, process_once, parquet_files_sink_override, @@ -42,15 +40,6 @@ pub fn log_config(config: &Config) { max_partition_fetch_queries_per_second, } = &config; - let (shard_cfg_n_shards, shard_cfg_shard_id) = match shard_config { - None => (None, None), - Some(shard_config) => { - // use struct unpack so we don't forget any members - let ShardConfig { n_shards, shard_id } = shard_config; - (Some(n_shards), Some(shard_id)) - } - }; - let parquet_files_sink_override = parquet_files_sink_override .as_ref() .map(|_| "Some") @@ -60,6 +49,7 @@ pub fn log_config(config: &Config) { info!( %catalog, + %scheduler_config, %parquet_store_real, %parquet_store_scratchpad, %exec, @@ -72,12 +62,9 @@ pub fn log_config(config: &Config) { percentage_max_file_size, split_percentage, partition_timeout_secs=partition_timeout.as_secs_f32(), - %partitions_source, shadow_mode, enable_scratchpad, ignore_partition_skip_marker, - ?shard_cfg_n_shards, - ?shard_cfg_shard_id, min_num_l1_files_to_compact, process_once, simulate_without_object_store, diff --git a/compactor/src/config.rs b/compactor/src/config.rs index f6bbfc3e08..63a2cf0abe 100644 --- a/compactor/src/config.rs +++ b/compactor/src/config.rs @@ -2,7 +2,7 @@ use std::{num::NonZeroUsize, sync::Arc, time::Duration}; use backoff::BackoffConfig; -use compactor_scheduler::{PartitionsSourceConfig, ShardConfig}; +use compactor_scheduler::SchedulerConfig; use iox_catalog::interface::Catalog; use iox_query::exec::Executor; use iox_time::TimeProvider; @@ -25,6 +25,9 @@ pub struct Config { /// Central catalog. pub catalog: Arc, + /// Scheduler configuration. + pub scheduler_config: SchedulerConfig, + /// Store holding the actual parquet files. pub parquet_store_real: ParquetStorage, @@ -78,9 +81,6 @@ pub struct Config { /// Maximum duration of the per-partition compaction task. pub partition_timeout: Duration, - /// Source of partitions to consider for comapction. - pub partitions_source: PartitionsSourceConfig, - /// Shadow mode. /// /// This will NOT write / commit any output to the object store or catalog. @@ -100,10 +100,6 @@ pub struct Config { /// This is mostly useful for debugging. pub ignore_partition_skip_marker: bool, - /// TODO: this will be removed in followup PR. - /// Shard config (if sharding should be enabled). - pub shard_config: Option, - /// Minimum number of L1 files to compact to L2 /// This is to prevent too many small files pub min_num_l1_files_to_compact: usize, diff --git a/compactor_scheduler/Cargo.toml b/compactor_scheduler/Cargo.toml index 0019662068..b99c1803d4 100644 --- a/compactor_scheduler/Cargo.toml +++ b/compactor_scheduler/Cargo.toml @@ -8,12 +8,12 @@ license.workspace = true [dependencies] async-trait = "0.1.68" backoff = { path = "../backoff" } -clap_blocks = { path = "../clap_blocks" } data_types = { path = "../data_types" } iox_catalog = { path = "../iox_catalog" } iox_time = { path = "../iox_time" } observability_deps = { path = "../observability_deps" } sharder = { path = "../sharder" } +uuid = { version = "1", features = ["v4"] } workspace-hack = { version = "0.1", path = "../workspace-hack" } [dev-dependencies] diff --git a/compactor_scheduler/src/lib.rs b/compactor_scheduler/src/lib.rs index 06dd1f047a..7d7d35e7d4 100644 --- a/compactor_scheduler/src/lib.rs +++ b/compactor_scheduler/src/lib.rs @@ -18,15 +18,115 @@ )] #![allow(clippy::missing_docs_in_private_items)] +use std::collections::HashSet; + +use backoff::BackoffConfig; +use data_types::PartitionId; +use iox_time::TimeProvider; // Workaround for "unused crate" lint false positives. use workspace_hack as _; mod local_scheduler; -pub use local_scheduler::LocalScheduler; +pub(crate) use local_scheduler::{id_only_partition_filter::IdOnlyPartitionFilter, LocalScheduler}; +// configurations used externally during scheduler setup +pub use local_scheduler::{ + partitions_source_config::PartitionsSourceConfig, shard_config::ShardConfig, + LocalSchedulerConfig, +}; +// partitions_source trait mod partitions_source; pub use partitions_source::*; +// scheduler trait and associated types +mod scheduler; +pub use scheduler::*; -// Temporary exports. Will eventually be encapsulated in local_scheduler. -pub use local_scheduler::id_only_partition_filter::IdOnlyPartitionFilter; -pub use local_scheduler::partitions_source_config::PartitionsSourceConfig; -pub use local_scheduler::shard_config::ShardConfig; +use std::sync::Arc; + +use iox_catalog::interface::Catalog; + +/// Instantiate a compaction scheduler service +pub fn create_scheduler( + config: SchedulerConfig, + catalog: Arc, + time_provider: Arc, +) -> Arc { + match config { + SchedulerConfig::Local(scheduler_config) => { + let scheduler = LocalScheduler::new( + scheduler_config, + BackoffConfig::default(), + catalog, + time_provider, + ); + Arc::new(scheduler) + } + } +} + +/// Create a new [`Scheduler`] for testing. +/// +/// If no mocked_partition_ids, the scheduler will use a [`LocalScheduler`] in default configuration. +/// Whereas if mocked_partition_ids are provided, the scheduler will use a [`LocalScheduler`] with [`PartitionsSourceConfig::Fixed`]. +pub fn create_test_scheduler( + catalog: Arc, + time_provider: Arc, + mocked_partition_ids: Option>, +) -> Arc { + let scheduler_config = match mocked_partition_ids { + None => SchedulerConfig::default(), + Some(partition_ids) => SchedulerConfig::Local(LocalSchedulerConfig { + partitions_source_config: PartitionsSourceConfig::Fixed( + partition_ids.into_iter().collect::>(), + ), + shard_config: None, + }), + }; + create_scheduler(scheduler_config, catalog, time_provider) +} + +#[cfg(test)] +mod tests { + use iox_tests::TestCatalog; + use iox_time::{MockProvider, Time}; + + use super::*; + + #[test] + fn test_display_will_not_change_for_external_tests() { + let scheduler = create_test_scheduler( + TestCatalog::new().catalog(), + Arc::new(MockProvider::new(Time::MIN)), + None, + ); + + assert_eq!(scheduler.to_string(), "local_compaction_scheduler"); + + let scheduler = create_test_scheduler( + TestCatalog::new().catalog(), + Arc::new(MockProvider::new(Time::MIN)), + Some(vec![PartitionId::new(0)]), + ); + + assert_eq!(scheduler.to_string(), "local_compaction_scheduler"); + } + + #[tokio::test] + async fn test_test_scheduler_with_mocked_parition_ids() { + let partitions = vec![PartitionId::new(0), PartitionId::new(1234242)]; + + let scheduler = create_test_scheduler( + TestCatalog::new().catalog(), + Arc::new(MockProvider::new(Time::MIN)), + Some(partitions.clone()), + ); + let mut result = scheduler + .get_jobs() + .await + .iter() + .map(|j| j.partition_id) + .collect::>(); + result.sort(); + + assert_eq!(result, partitions); + } +} diff --git a/compactor_scheduler/src/local_scheduler.rs b/compactor_scheduler/src/local_scheduler.rs index 710beed7d7..2aa917ef1f 100644 --- a/compactor_scheduler/src/local_scheduler.rs +++ b/compactor_scheduler/src/local_scheduler.rs @@ -8,12 +8,14 @@ use std::sync::Arc; use async_trait::async_trait; use backoff::BackoffConfig; -use data_types::PartitionId; use iox_catalog::interface::Catalog; use iox_time::TimeProvider; use observability_deps::tracing::info; -use crate::{MockPartitionsSource, PartitionsSource, PartitionsSourceConfig, ShardConfig}; +use crate::{ + CompactionJob, MockPartitionsSource, PartitionsSource, PartitionsSourceConfig, Scheduler, + ShardConfig, +}; use self::{ id_only_partition_filter::{ @@ -26,23 +28,34 @@ use self::{ }, }; +/// Configuration specific to the local scheduler. +#[derive(Debug, Default, Clone)] +pub struct LocalSchedulerConfig { + /// The partitions source config used by the local sceduler. + pub partitions_source_config: PartitionsSourceConfig, + /// The shard config used by the local sceduler. + pub shard_config: Option, +} + /// Implementation of the scheduler for local (per compactor) scheduling. #[derive(Debug)] -pub struct LocalScheduler { +pub(crate) struct LocalScheduler { /// The partitions source to use for scheduling. partitions_source: Arc, + /// The shard config used for generating the PartitionsSource. + shard_config: Option, } impl LocalScheduler { /// Create a new [`LocalScheduler`]. - pub fn new( - config: PartitionsSourceConfig, - shard_config: Option, + pub(crate) fn new( + config: LocalSchedulerConfig, backoff_config: BackoffConfig, catalog: Arc, time_provider: Arc, ) -> Self { - let partitions_source: Arc = match &config { + let shard_config = config.shard_config; + let partitions_source: Arc = match &config.partitions_source_config { PartitionsSourceConfig::CatalogRecentWrites { threshold } => { Arc::new(CatalogToCompactPartitionsSource::new( backoff_config, @@ -79,20 +92,75 @@ impl LocalScheduler { partitions_source, )); - Self { partitions_source } + Self { + partitions_source, + shard_config, + } } } #[async_trait] -impl PartitionsSource for LocalScheduler { - // TODO: followon PR will replace with Arc.get_job() - async fn fetch(&self) -> Vec { - self.partitions_source.fetch().await +impl Scheduler for LocalScheduler { + async fn get_jobs(&self) -> Vec { + self.partitions_source + .fetch() + .await + .into_iter() + .map(CompactionJob::new) + .collect() } } impl std::fmt::Display for LocalScheduler { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "local_compaction_scheduler") + match &self.shard_config { + None => write!(f, "local_compaction_scheduler"), + Some(shard_config) => write!(f, "local_compaction_scheduler({shard_config})",), + } + } +} + +#[cfg(test)] +mod tests { + use iox_tests::TestCatalog; + use iox_time::{MockProvider, Time}; + + use super::*; + + #[test] + fn test_display() { + let scheduler = LocalScheduler::new( + LocalSchedulerConfig::default(), + BackoffConfig::default(), + TestCatalog::new().catalog(), + Arc::new(MockProvider::new(Time::MIN)), + ); + + assert_eq!(scheduler.to_string(), "local_compaction_scheduler",); + } + + #[test] + fn test_display_with_sharding() { + let shard_config = Some(ShardConfig { + n_shards: 2, + shard_id: 1, + }); + + let config = LocalSchedulerConfig { + partitions_source_config: PartitionsSourceConfig::default(), + shard_config, + }; + + let scheduler = LocalScheduler::new( + config, + BackoffConfig::default(), + TestCatalog::new().catalog(), + Arc::new(MockProvider::new(Time::MIN)), + ); + + assert_eq!( + scheduler.to_string(), + "local_compaction_scheduler(shard_cfg(n_shards=2,shard_id=1))", + ); } } diff --git a/compactor_scheduler/src/local_scheduler/id_only_partition_filter/mod.rs b/compactor_scheduler/src/local_scheduler/id_only_partition_filter/mod.rs index 008e34c7e2..03d59b64cf 100644 --- a/compactor_scheduler/src/local_scheduler/id_only_partition_filter/mod.rs +++ b/compactor_scheduler/src/local_scheduler/id_only_partition_filter/mod.rs @@ -12,7 +12,7 @@ pub(crate) mod shard; /// /// This will usually be used BEFORE any parquet files for the given partition are fetched and hence is a quite /// efficient filter stage. -pub trait IdOnlyPartitionFilter: Debug + Display + Send + Sync { +pub(crate) trait IdOnlyPartitionFilter: Debug + Display + Send + Sync { /// Returns true if the partition should be included. fn apply(&self, partition_id: PartitionId) -> bool; } diff --git a/compactor_scheduler/src/local_scheduler/partitions_source_config.rs b/compactor_scheduler/src/local_scheduler/partitions_source_config.rs index 1d7c8425b4..dc9d65166b 100644 --- a/compactor_scheduler/src/local_scheduler/partitions_source_config.rs +++ b/compactor_scheduler/src/local_scheduler/partitions_source_config.rs @@ -1,6 +1,5 @@ use std::{collections::HashSet, fmt::Display, time::Duration}; -use clap_blocks::compactor_scheduler::PartitionSourceConfigForLocalScheduler; use data_types::PartitionId; /// Default threshold for hot partitions @@ -50,89 +49,3 @@ impl Default for PartitionsSourceConfig { } } } - -impl PartitionsSourceConfig { - /// Create a new [`PartitionsSourceConfig`] from the CLI|env config. - pub fn from_config(config: PartitionSourceConfigForLocalScheduler) -> PartitionsSourceConfig { - let PartitionSourceConfigForLocalScheduler { - partition_filter, - process_all_partitions, - compaction_partition_minute_threshold, - } = config; - - match (partition_filter, process_all_partitions) { - (None, false) => PartitionsSourceConfig::CatalogRecentWrites { - threshold: Duration::from_secs(compaction_partition_minute_threshold * 60), - }, - (None, true) => PartitionsSourceConfig::CatalogAll, - (Some(ids), false) => { - PartitionsSourceConfig::Fixed(ids.iter().cloned().map(PartitionId::new).collect()) - } - (Some(_), true) => panic!( - "provided partition ID filter and specific 'process all', this does not make sense" - ), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - #[should_panic( - expected = "provided partition ID filter and specific 'process all', this does not make sense" - )] - fn process_all_and_partition_filter_incompatible() { - let config = PartitionSourceConfigForLocalScheduler { - compaction_partition_minute_threshold: 10, - partition_filter: Some(vec![1, 7]), - process_all_partitions: true, - }; - PartitionsSourceConfig::from_config(config); - } - - #[test] - fn fixed_list_of_partitions() { - let config = PartitionSourceConfigForLocalScheduler { - compaction_partition_minute_threshold: 10, - partition_filter: Some(vec![1, 7]), - process_all_partitions: false, - }; - let partitions_source_config = PartitionsSourceConfig::from_config(config); - - assert_eq!( - partitions_source_config, - PartitionsSourceConfig::Fixed([PartitionId::new(1), PartitionId::new(7)].into()) - ); - } - - #[test] - fn all_in_the_catalog() { - let config = PartitionSourceConfigForLocalScheduler { - compaction_partition_minute_threshold: 10, - partition_filter: None, - process_all_partitions: true, - }; - let partitions_source_config = PartitionsSourceConfig::from_config(config); - - assert_eq!(partitions_source_config, PartitionsSourceConfig::CatalogAll,); - } - - #[test] - fn normal_compaction() { - let config = PartitionSourceConfigForLocalScheduler { - compaction_partition_minute_threshold: 10, - partition_filter: None, - process_all_partitions: false, - }; - let partitions_source_config = PartitionsSourceConfig::from_config(config); - - assert_eq!( - partitions_source_config, - PartitionsSourceConfig::CatalogRecentWrites { - threshold: Duration::from_secs(600) - }, - ); - } -} diff --git a/compactor_scheduler/src/local_scheduler/shard_config.rs b/compactor_scheduler/src/local_scheduler/shard_config.rs index 8e23573679..f041092cdd 100644 --- a/compactor_scheduler/src/local_scheduler/shard_config.rs +++ b/compactor_scheduler/src/local_scheduler/shard_config.rs @@ -1,5 +1,3 @@ -use clap_blocks::compactor_scheduler::ShardConfigForLocalScheduler; - /// Shard config. /// configured per LocalScheduler, which equates to per compactor. #[derive(Debug, Clone)] @@ -14,35 +12,13 @@ pub struct ShardConfig { pub shard_id: usize, } -impl ShardConfig { - /// Create a new [`ShardConfig`] from a [`ShardConfigForLocalScheduler`]. - pub fn from_config(config: ShardConfigForLocalScheduler) -> Option { - match (config.shard_count, config.shard_id, config.hostname) { - // if no shard_count is provided, then we are not sharding - (None, _, _) => None, - // always use the shard_id if provided - (Some(shard_count), Some(shard_id), _) => Some(ShardConfig { - shard_id, - n_shards: shard_count, - }), - // if no shard_id is provided, then we are sharding by hostname - (Some(shard_count), None, Some(hostname)) => { - let parsed_id = hostname - .chars() - .skip_while(|ch| !ch.is_ascii_digit()) - .take_while(|ch| ch.is_ascii_digit()) - .fold(None, |acc, ch| { - ch.to_digit(10).map(|b| acc.unwrap_or(0) * 10 + b) - }); - assert!(parsed_id.is_some(), "hostname must end in a shard ID"); - Some(ShardConfig { - shard_id: parsed_id.unwrap() as usize, - n_shards: shard_count, - }) - } - (Some(_), None, None) => { - panic!("shard_count must be paired with either shard_id or hostname") - } - } +impl std::fmt::Display for ShardConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { n_shards, shard_id } = self; + write!( + f, + "shard_cfg(n_shards={:?},shard_id={:?})", + n_shards, shard_id + ) } } diff --git a/compactor_scheduler/src/scheduler.rs b/compactor_scheduler/src/scheduler.rs new file mode 100644 index 0000000000..13f5b8355d --- /dev/null +++ b/compactor_scheduler/src/scheduler.rs @@ -0,0 +1,62 @@ +use std::fmt::{Debug, Display}; + +use async_trait::async_trait; +use data_types::PartitionId; +use uuid::Uuid; + +use crate::LocalSchedulerConfig; + +/// Scheduler configuration. +#[derive(Debug, Clone)] +pub enum SchedulerConfig { + /// Configuration specific to the [`LocalScheduler`](crate::LocalScheduler). + Local(LocalSchedulerConfig), +} + +impl Default for SchedulerConfig { + fn default() -> Self { + Self::Local(LocalSchedulerConfig::default()) + } +} + +impl std::fmt::Display for SchedulerConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SchedulerConfig::Local(LocalSchedulerConfig { + shard_config, + partitions_source_config: _, + }) => match &shard_config { + None => write!(f, "local_compaction_scheduler"), + Some(shard_config) => write!(f, "local_compaction_scheduler({shard_config})",), + }, + } + } +} + +/// Job assignment for a given partition. +#[derive(Debug)] +pub struct CompactionJob { + #[allow(dead_code)] + /// Unique identifier for this job. + /// Should not be the same as the partition id. + uuid: Uuid, + /// Leased partition. + pub partition_id: PartitionId, +} + +impl CompactionJob { + /// Create new job. + pub fn new(partition_id: PartitionId) -> Self { + Self { + uuid: Uuid::new_v4(), + partition_id, + } + } +} + +/// Core trait used for all schedulers. +#[async_trait] +pub trait Scheduler: Send + Sync + Debug + Display { + /// Get partitions to be compacted. + async fn get_jobs(&self) -> Vec; +} diff --git a/compactor_test_utils/src/lib.rs b/compactor_test_utils/src/lib.rs index 22bad3b2d5..c0be0b889d 100644 --- a/compactor_test_utils/src/lib.rs +++ b/compactor_test_utils/src/lib.rs @@ -41,7 +41,7 @@ use compactor::{ compact, config::Config, hardcoded_components, Components, PanicDataFusionPlanner, PartitionInfo, }; -use compactor_scheduler::PartitionsSourceConfig; +use compactor_scheduler::SchedulerConfig; use data_types::{ColumnType, CompactionLevel, ParquetFile, TableId}; use datafusion::arrow::record_batch::RecordBatch; use datafusion_util::config::register_iox_object_store; @@ -59,7 +59,6 @@ use schema::sort::SortKey; use tracker::AsyncSemaphoreMetrics; // Default values for the test setup builder -const PARTITION_THRESHOLD: Duration = Duration::from_secs(10 * 60); // 10min const MAX_DESIRE_FILE_SIZE: u64 = 100 * 1024; const PERCENTAGE_MAX_FILE_SIZE: u16 = 5; const SPLIT_PERCENTAGE: u16 = 80; @@ -122,6 +121,7 @@ impl TestSetupBuilder { let config = Config { metric_registry: catalog.metric_registry(), catalog: catalog.catalog(), + scheduler_config: SchedulerConfig::default(), parquet_store_real: catalog.parquet_store.clone(), parquet_store_scratchpad: ParquetStorage::new( Arc::new(object_store::memory::InMemory::new()), @@ -137,13 +137,9 @@ impl TestSetupBuilder { percentage_max_file_size: PERCENTAGE_MAX_FILE_SIZE, split_percentage: SPLIT_PERCENTAGE, partition_timeout: Duration::from_secs(3_600), - partitions_source: PartitionsSourceConfig::CatalogRecentWrites { - threshold: PARTITION_THRESHOLD, - }, shadow_mode: false, enable_scratchpad: true, ignore_partition_skip_marker: false, - shard_config: None, min_num_l1_files_to_compact: MIN_NUM_L1_FILES_TO_COMPACT, process_once: true, simulate_without_object_store: false, diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 97e3694a0a..2c2e3519a0 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -486,6 +486,7 @@ impl Config { // settings from other configs. Can't use `#clap(flatten)` as the // parameters are redundant with ingester's let compactor_config = CompactorConfig { + compactor_scheduler_config, compaction_partition_concurrency: NonZeroUsize::new(1).unwrap(), compaction_df_concurrency: NonZeroUsize::new(1).unwrap(), compaction_partition_scratchpad_concurrency: NonZeroUsize::new(1).unwrap(), @@ -523,7 +524,6 @@ impl Config { ingester_run_config, compactor_run_config, - compactor_scheduler_config, catalog_dsn, ingester_config, @@ -551,8 +551,6 @@ struct SpecializedConfig { querier_run_config: RunConfig, ingester_run_config: RunConfig, compactor_run_config: RunConfig, - #[allow(dead_code)] - compactor_scheduler_config: CompactorSchedulerConfig, catalog_dsn: CatalogDsnConfig, ingester_config: IngesterConfig, @@ -567,7 +565,6 @@ pub async fn command(config: Config) -> Result<()> { querier_run_config, ingester_run_config, compactor_run_config, - compactor_scheduler_config, catalog_dsn, ingester_config, router_config, @@ -653,7 +650,6 @@ pub async fn command(config: Config) -> Result<()> { Arc::clone(&exec), Arc::clone(&time_provider), compactor_config, - compactor_scheduler_config, ) .await; diff --git a/influxdb_iox/src/commands/run/compactor.rs b/influxdb_iox/src/commands/run/compactor.rs index 72f2bdd016..5b90ca4571 100644 --- a/influxdb_iox/src/commands/run/compactor.rs +++ b/influxdb_iox/src/commands/run/compactor.rs @@ -3,8 +3,7 @@ use super::main; use crate::process_info::setup_metric_registry; use clap_blocks::{ - catalog_dsn::CatalogDsnConfig, compactor::CompactorConfig, - compactor_scheduler::CompactorSchedulerConfig, object_store::make_object_store, + catalog_dsn::CatalogDsnConfig, compactor::CompactorConfig, object_store::make_object_store, run_config::RunConfig, }; use compactor::object_store::metrics::MetricsStore; @@ -65,9 +64,6 @@ pub struct Config { #[clap(flatten)] pub(crate) compactor_config: CompactorConfig, - - #[clap(flatten)] - compactor_scheduler_config: CompactorSchedulerConfig, } pub async fn command(config: Config) -> Result<(), Error> { @@ -130,7 +126,6 @@ pub async fn command(config: Config) -> Result<(), Error> { exec, time_provider, config.compactor_config, - config.compactor_scheduler_config, ) .await; diff --git a/ioxd_compactor/Cargo.toml b/ioxd_compactor/Cargo.toml index 76fb52488a..e093402d01 100644 --- a/ioxd_compactor/Cargo.toml +++ b/ioxd_compactor/Cargo.toml @@ -11,6 +11,7 @@ backoff = { path = "../backoff" } clap_blocks = { path = "../clap_blocks" } compactor = { path = "../compactor" } compactor_scheduler = { path = "../compactor_scheduler" } +data_types = { path = "../data_types" } hyper = "0.14" iox_catalog = { path = "../iox_catalog" } iox_time = { path = "../iox_time" } diff --git a/ioxd_compactor/src/lib.rs b/ioxd_compactor/src/lib.rs index 71821ce398..1ed4cdfe3d 100644 --- a/ioxd_compactor/src/lib.rs +++ b/ioxd_compactor/src/lib.rs @@ -10,15 +10,15 @@ missing_debug_implementations, unused_crate_dependencies )] +mod scheduler_config; // Workaround for "unused crate" lint false positives. use workspace_hack as _; use async_trait::async_trait; use backoff::BackoffConfig; -use clap_blocks::{compactor::CompactorConfig, compactor_scheduler::CompactorSchedulerConfig}; +use clap_blocks::compactor::CompactorConfig; use compactor::{compactor::Compactor, config::Config}; -use compactor_scheduler::{PartitionsSourceConfig, ShardConfig}; use hyper::{Body, Request, Response}; use iox_catalog::interface::Catalog; use iox_query::exec::Executor; @@ -41,6 +41,8 @@ use std::{ use tokio_util::sync::CancellationToken; use trace::TraceCollector; +use crate::scheduler_config::convert_scheduler_config; + pub struct CompactorServerType { compactor: Compactor, metric_registry: Arc, @@ -153,19 +155,15 @@ pub async fn create_compactor_server_type( exec: Arc, time_provider: Arc, compactor_config: CompactorConfig, - // temporary dependency, until the rest of the code is moved over to the compactor_scheduler - compactor_scheduler_config: CompactorSchedulerConfig, ) -> Arc { let backoff_config = BackoffConfig::default(); - let shard_config = ShardConfig::from_config(compactor_scheduler_config.shard_config); - - let partitions_source = - PartitionsSourceConfig::from_config(compactor_scheduler_config.partition_source_config); - let compactor = Compactor::start(Config { metric_registry: Arc::clone(&metric_registry), catalog, + scheduler_config: convert_scheduler_config( + compactor_config.compactor_scheduler_config.clone(), + ), parquet_store_real, parquet_store_scratchpad, exec, @@ -179,11 +177,9 @@ pub async fn create_compactor_server_type( percentage_max_file_size: compactor_config.percentage_max_file_size, split_percentage: compactor_config.split_percentage, partition_timeout: Duration::from_secs(compactor_config.partition_timeout_secs), - partitions_source, shadow_mode: compactor_config.shadow_mode, enable_scratchpad: compactor_config.enable_scratchpad, ignore_partition_skip_marker: compactor_config.ignore_partition_skip_marker, - shard_config, min_num_l1_files_to_compact: compactor_config.min_num_l1_files_to_compact, process_once: compactor_config.process_once, simulate_without_object_store: false, diff --git a/ioxd_compactor/src/scheduler_config.rs b/ioxd_compactor/src/scheduler_config.rs new file mode 100644 index 0000000000..ca3156d942 --- /dev/null +++ b/ioxd_compactor/src/scheduler_config.rs @@ -0,0 +1,138 @@ +use std::time::Duration; + +use clap_blocks::compactor_scheduler::{ + CompactorSchedulerConfig, CompactorSchedulerType, PartitionSourceConfigForLocalScheduler, + ShardConfigForLocalScheduler, +}; +use compactor_scheduler::{ + LocalSchedulerConfig, PartitionsSourceConfig, SchedulerConfig, ShardConfig, +}; +use data_types::PartitionId; + +fn convert_partitions_source_config( + config: PartitionSourceConfigForLocalScheduler, +) -> PartitionsSourceConfig { + let PartitionSourceConfigForLocalScheduler { + partition_filter, + process_all_partitions, + compaction_partition_minute_threshold, + } = config; + + match (partition_filter, process_all_partitions) { + (None, false) => PartitionsSourceConfig::CatalogRecentWrites { + threshold: Duration::from_secs(compaction_partition_minute_threshold * 60), + }, + (None, true) => PartitionsSourceConfig::CatalogAll, + (Some(ids), false) => { + PartitionsSourceConfig::Fixed(ids.iter().cloned().map(PartitionId::new).collect()) + } + (Some(_), true) => panic!( + "provided partition ID filter and specific 'process all', this does not make sense" + ), + } +} + +/// Create a new [`ShardConfig`] from a [`ShardConfigForLocalScheduler`]. +fn convert_shard_config(config: ShardConfigForLocalScheduler) -> Option { + match (config.shard_count, config.shard_id, config.hostname) { + // if no shard_count is provided, then we are not sharding + (None, _, _) => None, + // always use the shard_id if provided + (Some(shard_count), Some(shard_id), _) => Some(ShardConfig { + shard_id, + n_shards: shard_count, + }), + // if no shard_id is provided, then we are sharding by hostname + (Some(shard_count), None, Some(hostname)) => { + let parsed_id = hostname + .chars() + .skip_while(|ch| !ch.is_ascii_digit()) + .take_while(|ch| ch.is_ascii_digit()) + .fold(None, |acc, ch| { + ch.to_digit(10).map(|b| acc.unwrap_or(0) * 10 + b) + }); + assert!(parsed_id.is_some(), "hostname must end in a shard ID"); + Some(ShardConfig { + shard_id: parsed_id.unwrap() as usize, + n_shards: shard_count, + }) + } + (Some(_), None, None) => { + panic!("shard_count must be paired with either shard_id or hostname") + } + } +} + +pub(crate) fn convert_scheduler_config(config: CompactorSchedulerConfig) -> SchedulerConfig { + match config.compactor_scheduler_type { + CompactorSchedulerType::Local => SchedulerConfig::Local(LocalSchedulerConfig { + partitions_source_config: convert_partitions_source_config( + config.partition_source_config, + ), + shard_config: convert_shard_config(config.shard_config), + }), + CompactorSchedulerType::Remote => unimplemented!("Remote scheduler not implemented"), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + #[should_panic( + expected = "provided partition ID filter and specific 'process all', this does not make sense" + )] + fn process_all_and_partition_filter_incompatible() { + let config = PartitionSourceConfigForLocalScheduler { + compaction_partition_minute_threshold: 10, + partition_filter: Some(vec![1, 7]), + process_all_partitions: true, + }; + convert_partitions_source_config(config); + } + + #[test] + fn fixed_list_of_partitions() { + let config = PartitionSourceConfigForLocalScheduler { + compaction_partition_minute_threshold: 10, + partition_filter: Some(vec![1, 7]), + process_all_partitions: false, + }; + let partitions_source_config = convert_partitions_source_config(config); + + assert_eq!( + partitions_source_config, + PartitionsSourceConfig::Fixed([PartitionId::new(1), PartitionId::new(7)].into()) + ); + } + + #[test] + fn all_in_the_catalog() { + let config = PartitionSourceConfigForLocalScheduler { + compaction_partition_minute_threshold: 10, + partition_filter: None, + process_all_partitions: true, + }; + let partitions_source_config = convert_partitions_source_config(config); + + assert_eq!(partitions_source_config, PartitionsSourceConfig::CatalogAll,); + } + + #[test] + fn normal_compaction() { + let config = PartitionSourceConfigForLocalScheduler { + compaction_partition_minute_threshold: 10, + partition_filter: None, + process_all_partitions: false, + }; + let partitions_source_config = convert_partitions_source_config(config); + + assert_eq!( + partitions_source_config, + PartitionsSourceConfig::CatalogRecentWrites { + threshold: Duration::from_secs(600) + }, + ); + } +}