feat(idpe 17789): provide scheduler interface (#8057)

* feat: provide convenience methods to create Scheduler, and keep the scheduler implementations crate private. External crates can only create a Scheduler based upon configs.

* feat: provide Scheduler as a component to compactor. Specifically, the scheduler configs are present within the compactor run config, and the scheduler in created within the compactor hardcoded components.

* feat: within the compactor ScheduledPartitionsSource, utilize the dyn Scheduler and Scheduler.get_jobs()

* feat: CompactionJob should be per partition, and have a uniqueness characteristic independent of the partition

* feat: keep compactor_scheduler separate from clap_blocks. Only interface is within ioxd_compactor where the CLI configs are transformed into ShardConfig and PartitionsSourceConfig.

* chore: make IdOnlyPartitionFilter into only pub(crate)

* chore: update scheduler display to include any report information (a.k.a. shard_config, if present)
pull/24376/head
wiedld 2023-06-28 15:04:00 -07:00 committed by GitHub
parent 33be68477f
commit 3a8a8a153e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 440 additions and 224 deletions

3
Cargo.lock generated
View File

@ -1011,7 +1011,6 @@ version = "0.1.0"
dependencies = [
"async-trait",
"backoff",
"clap_blocks",
"data_types",
"iox_catalog",
"iox_tests",
@ -1019,6 +1018,7 @@ dependencies = [
"observability_deps",
"sharder",
"tokio",
"uuid",
"workspace-hack",
]
@ -3008,6 +3008,7 @@ dependencies = [
"clap_blocks",
"compactor",
"compactor_scheduler",
"data_types",
"hyper",
"iox_catalog",
"iox_query",

View File

@ -2,9 +2,15 @@
use std::num::NonZeroUsize;
use super::compactor_scheduler::CompactorSchedulerConfig;
/// CLI config for compactor
#[derive(Debug, Copy, Clone, clap::Parser)]
#[derive(Debug, Clone, clap::Parser)]
pub struct CompactorConfig {
/// Configuration for the compactor scheduler
#[clap(flatten)]
pub compactor_scheduler_config: CompactorSchedulerConfig,
/// Number of partitions that should be compacted in parallel.
///
/// This should usually be larger than the compaction job

View File

@ -4,7 +4,7 @@
use std::{sync::Arc, time::Duration};
use compactor_scheduler::PartitionsSource;
use compactor_scheduler::{create_scheduler, PartitionsSource, Scheduler};
use data_types::CompactionLevel;
use object_store::memory::InMemory;
@ -87,8 +87,13 @@ use super::{
/// Get hardcoded components.
pub fn hardcoded_components(config: &Config) -> Arc<Components> {
let scheduler = create_scheduler(
config.scheduler_config.clone(),
Arc::clone(&config.catalog),
Arc::clone(&config.time_provider),
);
let (partitions_source, commit, partition_done_sink) =
make_partitions_source_commit_partition_sink(config);
make_partitions_source_commit_partition_sink(config, Arc::clone(&scheduler));
Arc::new(Components {
partition_stream: make_partition_stream(config, partitions_source),
@ -113,12 +118,13 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
fn make_partitions_source_commit_partition_sink(
config: &Config,
scheduler: Arc<dyn Scheduler>,
) -> (
Arc<dyn PartitionsSource>,
Arc<dyn Commit>,
Arc<dyn PartitionDoneSink>,
) {
let partitions_source = ScheduledPartitionsSource::new(config);
let partitions_source = ScheduledPartitionsSource::new(scheduler);
let partition_done_sink: Arc<dyn PartitionDoneSink> = if config.shadow_mode {
Arc::new(MockPartitionDoneSink::new())

View File

@ -19,10 +19,6 @@ pub mod or;
/// Filters partition based on ID and Parquet files.
///
/// May return an error. In this case, the partition will be marked as "skipped".
///
/// If you only plan to inspect the ID but not the files and not perform any IO, check
/// [`IdOnlyPartitionFilter`](compactor_scheduler::IdOnlyPartitionFilter)
/// which usually runs earlier in the pipeline and hence is more efficient.
#[async_trait]
pub trait PartitionFilter: Debug + Display + Send + Sync {
/// Return `true` if the compactor should run a compaction on this partition. Return `false`

View File

@ -1,29 +1,16 @@
use std::sync::Arc;
use async_trait::async_trait;
use compactor_scheduler::{LocalScheduler, PartitionsSource};
use compactor_scheduler::{CompactionJob, PartitionsSource, Scheduler};
use data_types::PartitionId;
use crate::config::Config;
#[derive(Debug)]
pub struct ScheduledPartitionsSource {
scheduler: LocalScheduler, // TODO: followon PR will replace with Arc<dyn Scheduler>
scheduler: Arc<dyn Scheduler>,
}
impl ScheduledPartitionsSource {
pub fn new(config: &Config) -> Self {
// TODO: followon PR:
// * will have the Arc<dyn Scheduler> provided as a component to the compactor
// * this Scheduler will be created with the below components
let scheduler = LocalScheduler::new(
config.partitions_source.clone(),
config.shard_config.clone(),
config.backoff_config.clone(),
Arc::clone(&config.catalog),
Arc::clone(&config.time_provider),
);
pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
Self { scheduler }
}
}
@ -31,7 +18,8 @@ impl ScheduledPartitionsSource {
#[async_trait]
impl PartitionsSource for ScheduledPartitionsSource {
async fn fetch(&self) -> Vec<PartitionId> {
self.scheduler.fetch().await
let job: Vec<CompactionJob> = self.scheduler.get_jobs().await;
job.into_iter().map(|job| job.partition_id).collect()
}
}
@ -43,10 +31,7 @@ impl std::fmt::Display for ScheduledPartitionsSource {
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use backoff::BackoffConfig;
use compactor_scheduler::PartitionsSourceConfig;
use compactor_scheduler::create_test_scheduler;
use iox_tests::TestCatalog;
use iox_time::{MockProvider, Time};
@ -54,12 +39,10 @@ mod tests {
#[test]
fn test_display() {
let scheduler = LocalScheduler::new(
PartitionsSourceConfig::Fixed(HashSet::new()),
None,
BackoffConfig::default(),
let scheduler = create_test_scheduler(
TestCatalog::new().catalog(),
Arc::new(MockProvider::new(Time::MIN)),
None,
);
let source = ScheduledPartitionsSource { scheduler };

View File

@ -1,6 +1,5 @@
//! Report component system state.
use compactor_scheduler::ShardConfig;
use observability_deps::tracing::info;
use crate::config::Config;
@ -14,6 +13,7 @@ pub fn log_config(config: &Config) {
// no need to print the internal state of the registry
metric_registry: _,
catalog,
scheduler_config,
parquet_store_real,
parquet_store_scratchpad,
exec,
@ -26,11 +26,9 @@ pub fn log_config(config: &Config) {
percentage_max_file_size,
split_percentage,
partition_timeout,
partitions_source,
shadow_mode,
enable_scratchpad,
ignore_partition_skip_marker,
shard_config,
min_num_l1_files_to_compact,
process_once,
parquet_files_sink_override,
@ -42,15 +40,6 @@ pub fn log_config(config: &Config) {
max_partition_fetch_queries_per_second,
} = &config;
let (shard_cfg_n_shards, shard_cfg_shard_id) = match shard_config {
None => (None, None),
Some(shard_config) => {
// use struct unpack so we don't forget any members
let ShardConfig { n_shards, shard_id } = shard_config;
(Some(n_shards), Some(shard_id))
}
};
let parquet_files_sink_override = parquet_files_sink_override
.as_ref()
.map(|_| "Some")
@ -60,6 +49,7 @@ pub fn log_config(config: &Config) {
info!(
%catalog,
%scheduler_config,
%parquet_store_real,
%parquet_store_scratchpad,
%exec,
@ -72,12 +62,9 @@ pub fn log_config(config: &Config) {
percentage_max_file_size,
split_percentage,
partition_timeout_secs=partition_timeout.as_secs_f32(),
%partitions_source,
shadow_mode,
enable_scratchpad,
ignore_partition_skip_marker,
?shard_cfg_n_shards,
?shard_cfg_shard_id,
min_num_l1_files_to_compact,
process_once,
simulate_without_object_store,

View File

@ -2,7 +2,7 @@
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use backoff::BackoffConfig;
use compactor_scheduler::{PartitionsSourceConfig, ShardConfig};
use compactor_scheduler::SchedulerConfig;
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use iox_time::TimeProvider;
@ -25,6 +25,9 @@ pub struct Config {
/// Central catalog.
pub catalog: Arc<dyn Catalog>,
/// Scheduler configuration.
pub scheduler_config: SchedulerConfig,
/// Store holding the actual parquet files.
pub parquet_store_real: ParquetStorage,
@ -78,9 +81,6 @@ pub struct Config {
/// Maximum duration of the per-partition compaction task.
pub partition_timeout: Duration,
/// Source of partitions to consider for comapction.
pub partitions_source: PartitionsSourceConfig,
/// Shadow mode.
///
/// This will NOT write / commit any output to the object store or catalog.
@ -100,10 +100,6 @@ pub struct Config {
/// This is mostly useful for debugging.
pub ignore_partition_skip_marker: bool,
/// TODO: this will be removed in followup PR.
/// Shard config (if sharding should be enabled).
pub shard_config: Option<ShardConfig>,
/// Minimum number of L1 files to compact to L2
/// This is to prevent too many small files
pub min_num_l1_files_to_compact: usize,

View File

@ -8,12 +8,12 @@ license.workspace = true
[dependencies]
async-trait = "0.1.68"
backoff = { path = "../backoff" }
clap_blocks = { path = "../clap_blocks" }
data_types = { path = "../data_types" }
iox_catalog = { path = "../iox_catalog" }
iox_time = { path = "../iox_time" }
observability_deps = { path = "../observability_deps" }
sharder = { path = "../sharder" }
uuid = { version = "1", features = ["v4"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]

View File

@ -18,15 +18,115 @@
)]
#![allow(clippy::missing_docs_in_private_items)]
use std::collections::HashSet;
use backoff::BackoffConfig;
use data_types::PartitionId;
use iox_time::TimeProvider;
// Workaround for "unused crate" lint false positives.
use workspace_hack as _;
mod local_scheduler;
pub use local_scheduler::LocalScheduler;
pub(crate) use local_scheduler::{id_only_partition_filter::IdOnlyPartitionFilter, LocalScheduler};
// configurations used externally during scheduler setup
pub use local_scheduler::{
partitions_source_config::PartitionsSourceConfig, shard_config::ShardConfig,
LocalSchedulerConfig,
};
// partitions_source trait
mod partitions_source;
pub use partitions_source::*;
// scheduler trait and associated types
mod scheduler;
pub use scheduler::*;
// Temporary exports. Will eventually be encapsulated in local_scheduler.
pub use local_scheduler::id_only_partition_filter::IdOnlyPartitionFilter;
pub use local_scheduler::partitions_source_config::PartitionsSourceConfig;
pub use local_scheduler::shard_config::ShardConfig;
use std::sync::Arc;
use iox_catalog::interface::Catalog;
/// Instantiate a compaction scheduler service
pub fn create_scheduler(
config: SchedulerConfig,
catalog: Arc<dyn Catalog>,
time_provider: Arc<dyn TimeProvider>,
) -> Arc<dyn Scheduler> {
match config {
SchedulerConfig::Local(scheduler_config) => {
let scheduler = LocalScheduler::new(
scheduler_config,
BackoffConfig::default(),
catalog,
time_provider,
);
Arc::new(scheduler)
}
}
}
/// Create a new [`Scheduler`] for testing.
///
/// If no mocked_partition_ids, the scheduler will use a [`LocalScheduler`] in default configuration.
/// Whereas if mocked_partition_ids are provided, the scheduler will use a [`LocalScheduler`] with [`PartitionsSourceConfig::Fixed`].
pub fn create_test_scheduler(
catalog: Arc<dyn Catalog>,
time_provider: Arc<dyn TimeProvider>,
mocked_partition_ids: Option<Vec<PartitionId>>,
) -> Arc<dyn Scheduler> {
let scheduler_config = match mocked_partition_ids {
None => SchedulerConfig::default(),
Some(partition_ids) => SchedulerConfig::Local(LocalSchedulerConfig {
partitions_source_config: PartitionsSourceConfig::Fixed(
partition_ids.into_iter().collect::<HashSet<PartitionId>>(),
),
shard_config: None,
}),
};
create_scheduler(scheduler_config, catalog, time_provider)
}
#[cfg(test)]
mod tests {
use iox_tests::TestCatalog;
use iox_time::{MockProvider, Time};
use super::*;
#[test]
fn test_display_will_not_change_for_external_tests() {
let scheduler = create_test_scheduler(
TestCatalog::new().catalog(),
Arc::new(MockProvider::new(Time::MIN)),
None,
);
assert_eq!(scheduler.to_string(), "local_compaction_scheduler");
let scheduler = create_test_scheduler(
TestCatalog::new().catalog(),
Arc::new(MockProvider::new(Time::MIN)),
Some(vec![PartitionId::new(0)]),
);
assert_eq!(scheduler.to_string(), "local_compaction_scheduler");
}
#[tokio::test]
async fn test_test_scheduler_with_mocked_parition_ids() {
let partitions = vec![PartitionId::new(0), PartitionId::new(1234242)];
let scheduler = create_test_scheduler(
TestCatalog::new().catalog(),
Arc::new(MockProvider::new(Time::MIN)),
Some(partitions.clone()),
);
let mut result = scheduler
.get_jobs()
.await
.iter()
.map(|j| j.partition_id)
.collect::<Vec<PartitionId>>();
result.sort();
assert_eq!(result, partitions);
}
}

View File

@ -8,12 +8,14 @@ use std::sync::Arc;
use async_trait::async_trait;
use backoff::BackoffConfig;
use data_types::PartitionId;
use iox_catalog::interface::Catalog;
use iox_time::TimeProvider;
use observability_deps::tracing::info;
use crate::{MockPartitionsSource, PartitionsSource, PartitionsSourceConfig, ShardConfig};
use crate::{
CompactionJob, MockPartitionsSource, PartitionsSource, PartitionsSourceConfig, Scheduler,
ShardConfig,
};
use self::{
id_only_partition_filter::{
@ -26,23 +28,34 @@ use self::{
},
};
/// Configuration specific to the local scheduler.
#[derive(Debug, Default, Clone)]
pub struct LocalSchedulerConfig {
/// The partitions source config used by the local sceduler.
pub partitions_source_config: PartitionsSourceConfig,
/// The shard config used by the local sceduler.
pub shard_config: Option<ShardConfig>,
}
/// Implementation of the scheduler for local (per compactor) scheduling.
#[derive(Debug)]
pub struct LocalScheduler {
pub(crate) struct LocalScheduler {
/// The partitions source to use for scheduling.
partitions_source: Arc<dyn PartitionsSource>,
/// The shard config used for generating the PartitionsSource.
shard_config: Option<ShardConfig>,
}
impl LocalScheduler {
/// Create a new [`LocalScheduler`].
pub fn new(
config: PartitionsSourceConfig,
shard_config: Option<ShardConfig>,
pub(crate) fn new(
config: LocalSchedulerConfig,
backoff_config: BackoffConfig,
catalog: Arc<dyn Catalog>,
time_provider: Arc<dyn TimeProvider>,
) -> Self {
let partitions_source: Arc<dyn PartitionsSource> = match &config {
let shard_config = config.shard_config;
let partitions_source: Arc<dyn PartitionsSource> = match &config.partitions_source_config {
PartitionsSourceConfig::CatalogRecentWrites { threshold } => {
Arc::new(CatalogToCompactPartitionsSource::new(
backoff_config,
@ -79,20 +92,75 @@ impl LocalScheduler {
partitions_source,
));
Self { partitions_source }
Self {
partitions_source,
shard_config,
}
}
}
#[async_trait]
impl PartitionsSource for LocalScheduler {
// TODO: followon PR will replace with Arc<dyn Scheduler>.get_job()
async fn fetch(&self) -> Vec<PartitionId> {
self.partitions_source.fetch().await
impl Scheduler for LocalScheduler {
async fn get_jobs(&self) -> Vec<CompactionJob> {
self.partitions_source
.fetch()
.await
.into_iter()
.map(CompactionJob::new)
.collect()
}
}
impl std::fmt::Display for LocalScheduler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "local_compaction_scheduler")
match &self.shard_config {
None => write!(f, "local_compaction_scheduler"),
Some(shard_config) => write!(f, "local_compaction_scheduler({shard_config})",),
}
}
}
#[cfg(test)]
mod tests {
use iox_tests::TestCatalog;
use iox_time::{MockProvider, Time};
use super::*;
#[test]
fn test_display() {
let scheduler = LocalScheduler::new(
LocalSchedulerConfig::default(),
BackoffConfig::default(),
TestCatalog::new().catalog(),
Arc::new(MockProvider::new(Time::MIN)),
);
assert_eq!(scheduler.to_string(), "local_compaction_scheduler",);
}
#[test]
fn test_display_with_sharding() {
let shard_config = Some(ShardConfig {
n_shards: 2,
shard_id: 1,
});
let config = LocalSchedulerConfig {
partitions_source_config: PartitionsSourceConfig::default(),
shard_config,
};
let scheduler = LocalScheduler::new(
config,
BackoffConfig::default(),
TestCatalog::new().catalog(),
Arc::new(MockProvider::new(Time::MIN)),
);
assert_eq!(
scheduler.to_string(),
"local_compaction_scheduler(shard_cfg(n_shards=2,shard_id=1))",
);
}
}

View File

@ -12,7 +12,7 @@ pub(crate) mod shard;
///
/// This will usually be used BEFORE any parquet files for the given partition are fetched and hence is a quite
/// efficient filter stage.
pub trait IdOnlyPartitionFilter: Debug + Display + Send + Sync {
pub(crate) trait IdOnlyPartitionFilter: Debug + Display + Send + Sync {
/// Returns true if the partition should be included.
fn apply(&self, partition_id: PartitionId) -> bool;
}

View File

@ -1,6 +1,5 @@
use std::{collections::HashSet, fmt::Display, time::Duration};
use clap_blocks::compactor_scheduler::PartitionSourceConfigForLocalScheduler;
use data_types::PartitionId;
/// Default threshold for hot partitions
@ -50,89 +49,3 @@ impl Default for PartitionsSourceConfig {
}
}
}
impl PartitionsSourceConfig {
/// Create a new [`PartitionsSourceConfig`] from the CLI|env config.
pub fn from_config(config: PartitionSourceConfigForLocalScheduler) -> PartitionsSourceConfig {
let PartitionSourceConfigForLocalScheduler {
partition_filter,
process_all_partitions,
compaction_partition_minute_threshold,
} = config;
match (partition_filter, process_all_partitions) {
(None, false) => PartitionsSourceConfig::CatalogRecentWrites {
threshold: Duration::from_secs(compaction_partition_minute_threshold * 60),
},
(None, true) => PartitionsSourceConfig::CatalogAll,
(Some(ids), false) => {
PartitionsSourceConfig::Fixed(ids.iter().cloned().map(PartitionId::new).collect())
}
(Some(_), true) => panic!(
"provided partition ID filter and specific 'process all', this does not make sense"
),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[should_panic(
expected = "provided partition ID filter and specific 'process all', this does not make sense"
)]
fn process_all_and_partition_filter_incompatible() {
let config = PartitionSourceConfigForLocalScheduler {
compaction_partition_minute_threshold: 10,
partition_filter: Some(vec![1, 7]),
process_all_partitions: true,
};
PartitionsSourceConfig::from_config(config);
}
#[test]
fn fixed_list_of_partitions() {
let config = PartitionSourceConfigForLocalScheduler {
compaction_partition_minute_threshold: 10,
partition_filter: Some(vec![1, 7]),
process_all_partitions: false,
};
let partitions_source_config = PartitionsSourceConfig::from_config(config);
assert_eq!(
partitions_source_config,
PartitionsSourceConfig::Fixed([PartitionId::new(1), PartitionId::new(7)].into())
);
}
#[test]
fn all_in_the_catalog() {
let config = PartitionSourceConfigForLocalScheduler {
compaction_partition_minute_threshold: 10,
partition_filter: None,
process_all_partitions: true,
};
let partitions_source_config = PartitionsSourceConfig::from_config(config);
assert_eq!(partitions_source_config, PartitionsSourceConfig::CatalogAll,);
}
#[test]
fn normal_compaction() {
let config = PartitionSourceConfigForLocalScheduler {
compaction_partition_minute_threshold: 10,
partition_filter: None,
process_all_partitions: false,
};
let partitions_source_config = PartitionsSourceConfig::from_config(config);
assert_eq!(
partitions_source_config,
PartitionsSourceConfig::CatalogRecentWrites {
threshold: Duration::from_secs(600)
},
);
}
}

View File

@ -1,5 +1,3 @@
use clap_blocks::compactor_scheduler::ShardConfigForLocalScheduler;
/// Shard config.
/// configured per LocalScheduler, which equates to per compactor.
#[derive(Debug, Clone)]
@ -14,35 +12,13 @@ pub struct ShardConfig {
pub shard_id: usize,
}
impl ShardConfig {
/// Create a new [`ShardConfig`] from a [`ShardConfigForLocalScheduler`].
pub fn from_config(config: ShardConfigForLocalScheduler) -> Option<Self> {
match (config.shard_count, config.shard_id, config.hostname) {
// if no shard_count is provided, then we are not sharding
(None, _, _) => None,
// always use the shard_id if provided
(Some(shard_count), Some(shard_id), _) => Some(ShardConfig {
shard_id,
n_shards: shard_count,
}),
// if no shard_id is provided, then we are sharding by hostname
(Some(shard_count), None, Some(hostname)) => {
let parsed_id = hostname
.chars()
.skip_while(|ch| !ch.is_ascii_digit())
.take_while(|ch| ch.is_ascii_digit())
.fold(None, |acc, ch| {
ch.to_digit(10).map(|b| acc.unwrap_or(0) * 10 + b)
});
assert!(parsed_id.is_some(), "hostname must end in a shard ID");
Some(ShardConfig {
shard_id: parsed_id.unwrap() as usize,
n_shards: shard_count,
})
}
(Some(_), None, None) => {
panic!("shard_count must be paired with either shard_id or hostname")
}
}
impl std::fmt::Display for ShardConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self { n_shards, shard_id } = self;
write!(
f,
"shard_cfg(n_shards={:?},shard_id={:?})",
n_shards, shard_id
)
}
}

View File

@ -0,0 +1,62 @@
use std::fmt::{Debug, Display};
use async_trait::async_trait;
use data_types::PartitionId;
use uuid::Uuid;
use crate::LocalSchedulerConfig;
/// Scheduler configuration.
#[derive(Debug, Clone)]
pub enum SchedulerConfig {
/// Configuration specific to the [`LocalScheduler`](crate::LocalScheduler).
Local(LocalSchedulerConfig),
}
impl Default for SchedulerConfig {
fn default() -> Self {
Self::Local(LocalSchedulerConfig::default())
}
}
impl std::fmt::Display for SchedulerConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SchedulerConfig::Local(LocalSchedulerConfig {
shard_config,
partitions_source_config: _,
}) => match &shard_config {
None => write!(f, "local_compaction_scheduler"),
Some(shard_config) => write!(f, "local_compaction_scheduler({shard_config})",),
},
}
}
}
/// Job assignment for a given partition.
#[derive(Debug)]
pub struct CompactionJob {
#[allow(dead_code)]
/// Unique identifier for this job.
/// Should not be the same as the partition id.
uuid: Uuid,
/// Leased partition.
pub partition_id: PartitionId,
}
impl CompactionJob {
/// Create new job.
pub fn new(partition_id: PartitionId) -> Self {
Self {
uuid: Uuid::new_v4(),
partition_id,
}
}
}
/// Core trait used for all schedulers.
#[async_trait]
pub trait Scheduler: Send + Sync + Debug + Display {
/// Get partitions to be compacted.
async fn get_jobs(&self) -> Vec<CompactionJob>;
}

View File

@ -41,7 +41,7 @@ use compactor::{
compact, config::Config, hardcoded_components, Components, PanicDataFusionPlanner,
PartitionInfo,
};
use compactor_scheduler::PartitionsSourceConfig;
use compactor_scheduler::SchedulerConfig;
use data_types::{ColumnType, CompactionLevel, ParquetFile, TableId};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion_util::config::register_iox_object_store;
@ -59,7 +59,6 @@ use schema::sort::SortKey;
use tracker::AsyncSemaphoreMetrics;
// Default values for the test setup builder
const PARTITION_THRESHOLD: Duration = Duration::from_secs(10 * 60); // 10min
const MAX_DESIRE_FILE_SIZE: u64 = 100 * 1024;
const PERCENTAGE_MAX_FILE_SIZE: u16 = 5;
const SPLIT_PERCENTAGE: u16 = 80;
@ -122,6 +121,7 @@ impl TestSetupBuilder<false> {
let config = Config {
metric_registry: catalog.metric_registry(),
catalog: catalog.catalog(),
scheduler_config: SchedulerConfig::default(),
parquet_store_real: catalog.parquet_store.clone(),
parquet_store_scratchpad: ParquetStorage::new(
Arc::new(object_store::memory::InMemory::new()),
@ -137,13 +137,9 @@ impl TestSetupBuilder<false> {
percentage_max_file_size: PERCENTAGE_MAX_FILE_SIZE,
split_percentage: SPLIT_PERCENTAGE,
partition_timeout: Duration::from_secs(3_600),
partitions_source: PartitionsSourceConfig::CatalogRecentWrites {
threshold: PARTITION_THRESHOLD,
},
shadow_mode: false,
enable_scratchpad: true,
ignore_partition_skip_marker: false,
shard_config: None,
min_num_l1_files_to_compact: MIN_NUM_L1_FILES_TO_COMPACT,
process_once: true,
simulate_without_object_store: false,

View File

@ -486,6 +486,7 @@ impl Config {
// settings from other configs. Can't use `#clap(flatten)` as the
// parameters are redundant with ingester's
let compactor_config = CompactorConfig {
compactor_scheduler_config,
compaction_partition_concurrency: NonZeroUsize::new(1).unwrap(),
compaction_df_concurrency: NonZeroUsize::new(1).unwrap(),
compaction_partition_scratchpad_concurrency: NonZeroUsize::new(1).unwrap(),
@ -523,7 +524,6 @@ impl Config {
ingester_run_config,
compactor_run_config,
compactor_scheduler_config,
catalog_dsn,
ingester_config,
@ -551,8 +551,6 @@ struct SpecializedConfig {
querier_run_config: RunConfig,
ingester_run_config: RunConfig,
compactor_run_config: RunConfig,
#[allow(dead_code)]
compactor_scheduler_config: CompactorSchedulerConfig,
catalog_dsn: CatalogDsnConfig,
ingester_config: IngesterConfig,
@ -567,7 +565,6 @@ pub async fn command(config: Config) -> Result<()> {
querier_run_config,
ingester_run_config,
compactor_run_config,
compactor_scheduler_config,
catalog_dsn,
ingester_config,
router_config,
@ -653,7 +650,6 @@ pub async fn command(config: Config) -> Result<()> {
Arc::clone(&exec),
Arc::clone(&time_provider),
compactor_config,
compactor_scheduler_config,
)
.await;

View File

@ -3,8 +3,7 @@
use super::main;
use crate::process_info::setup_metric_registry;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig, compactor::CompactorConfig,
compactor_scheduler::CompactorSchedulerConfig, object_store::make_object_store,
catalog_dsn::CatalogDsnConfig, compactor::CompactorConfig, object_store::make_object_store,
run_config::RunConfig,
};
use compactor::object_store::metrics::MetricsStore;
@ -65,9 +64,6 @@ pub struct Config {
#[clap(flatten)]
pub(crate) compactor_config: CompactorConfig,
#[clap(flatten)]
compactor_scheduler_config: CompactorSchedulerConfig,
}
pub async fn command(config: Config) -> Result<(), Error> {
@ -130,7 +126,6 @@ pub async fn command(config: Config) -> Result<(), Error> {
exec,
time_provider,
config.compactor_config,
config.compactor_scheduler_config,
)
.await;

View File

@ -11,6 +11,7 @@ backoff = { path = "../backoff" }
clap_blocks = { path = "../clap_blocks" }
compactor = { path = "../compactor" }
compactor_scheduler = { path = "../compactor_scheduler" }
data_types = { path = "../data_types" }
hyper = "0.14"
iox_catalog = { path = "../iox_catalog" }
iox_time = { path = "../iox_time" }

View File

@ -10,15 +10,15 @@
missing_debug_implementations,
unused_crate_dependencies
)]
mod scheduler_config;
// Workaround for "unused crate" lint false positives.
use workspace_hack as _;
use async_trait::async_trait;
use backoff::BackoffConfig;
use clap_blocks::{compactor::CompactorConfig, compactor_scheduler::CompactorSchedulerConfig};
use clap_blocks::compactor::CompactorConfig;
use compactor::{compactor::Compactor, config::Config};
use compactor_scheduler::{PartitionsSourceConfig, ShardConfig};
use hyper::{Body, Request, Response};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
@ -41,6 +41,8 @@ use std::{
use tokio_util::sync::CancellationToken;
use trace::TraceCollector;
use crate::scheduler_config::convert_scheduler_config;
pub struct CompactorServerType {
compactor: Compactor,
metric_registry: Arc<Registry>,
@ -153,19 +155,15 @@ pub async fn create_compactor_server_type(
exec: Arc<Executor>,
time_provider: Arc<dyn TimeProvider>,
compactor_config: CompactorConfig,
// temporary dependency, until the rest of the code is moved over to the compactor_scheduler
compactor_scheduler_config: CompactorSchedulerConfig,
) -> Arc<dyn ServerType> {
let backoff_config = BackoffConfig::default();
let shard_config = ShardConfig::from_config(compactor_scheduler_config.shard_config);
let partitions_source =
PartitionsSourceConfig::from_config(compactor_scheduler_config.partition_source_config);
let compactor = Compactor::start(Config {
metric_registry: Arc::clone(&metric_registry),
catalog,
scheduler_config: convert_scheduler_config(
compactor_config.compactor_scheduler_config.clone(),
),
parquet_store_real,
parquet_store_scratchpad,
exec,
@ -179,11 +177,9 @@ pub async fn create_compactor_server_type(
percentage_max_file_size: compactor_config.percentage_max_file_size,
split_percentage: compactor_config.split_percentage,
partition_timeout: Duration::from_secs(compactor_config.partition_timeout_secs),
partitions_source,
shadow_mode: compactor_config.shadow_mode,
enable_scratchpad: compactor_config.enable_scratchpad,
ignore_partition_skip_marker: compactor_config.ignore_partition_skip_marker,
shard_config,
min_num_l1_files_to_compact: compactor_config.min_num_l1_files_to_compact,
process_once: compactor_config.process_once,
simulate_without_object_store: false,

View File

@ -0,0 +1,138 @@
use std::time::Duration;
use clap_blocks::compactor_scheduler::{
CompactorSchedulerConfig, CompactorSchedulerType, PartitionSourceConfigForLocalScheduler,
ShardConfigForLocalScheduler,
};
use compactor_scheduler::{
LocalSchedulerConfig, PartitionsSourceConfig, SchedulerConfig, ShardConfig,
};
use data_types::PartitionId;
fn convert_partitions_source_config(
config: PartitionSourceConfigForLocalScheduler,
) -> PartitionsSourceConfig {
let PartitionSourceConfigForLocalScheduler {
partition_filter,
process_all_partitions,
compaction_partition_minute_threshold,
} = config;
match (partition_filter, process_all_partitions) {
(None, false) => PartitionsSourceConfig::CatalogRecentWrites {
threshold: Duration::from_secs(compaction_partition_minute_threshold * 60),
},
(None, true) => PartitionsSourceConfig::CatalogAll,
(Some(ids), false) => {
PartitionsSourceConfig::Fixed(ids.iter().cloned().map(PartitionId::new).collect())
}
(Some(_), true) => panic!(
"provided partition ID filter and specific 'process all', this does not make sense"
),
}
}
/// Create a new [`ShardConfig`] from a [`ShardConfigForLocalScheduler`].
fn convert_shard_config(config: ShardConfigForLocalScheduler) -> Option<ShardConfig> {
match (config.shard_count, config.shard_id, config.hostname) {
// if no shard_count is provided, then we are not sharding
(None, _, _) => None,
// always use the shard_id if provided
(Some(shard_count), Some(shard_id), _) => Some(ShardConfig {
shard_id,
n_shards: shard_count,
}),
// if no shard_id is provided, then we are sharding by hostname
(Some(shard_count), None, Some(hostname)) => {
let parsed_id = hostname
.chars()
.skip_while(|ch| !ch.is_ascii_digit())
.take_while(|ch| ch.is_ascii_digit())
.fold(None, |acc, ch| {
ch.to_digit(10).map(|b| acc.unwrap_or(0) * 10 + b)
});
assert!(parsed_id.is_some(), "hostname must end in a shard ID");
Some(ShardConfig {
shard_id: parsed_id.unwrap() as usize,
n_shards: shard_count,
})
}
(Some(_), None, None) => {
panic!("shard_count must be paired with either shard_id or hostname")
}
}
}
pub(crate) fn convert_scheduler_config(config: CompactorSchedulerConfig) -> SchedulerConfig {
match config.compactor_scheduler_type {
CompactorSchedulerType::Local => SchedulerConfig::Local(LocalSchedulerConfig {
partitions_source_config: convert_partitions_source_config(
config.partition_source_config,
),
shard_config: convert_shard_config(config.shard_config),
}),
CompactorSchedulerType::Remote => unimplemented!("Remote scheduler not implemented"),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[should_panic(
expected = "provided partition ID filter and specific 'process all', this does not make sense"
)]
fn process_all_and_partition_filter_incompatible() {
let config = PartitionSourceConfigForLocalScheduler {
compaction_partition_minute_threshold: 10,
partition_filter: Some(vec![1, 7]),
process_all_partitions: true,
};
convert_partitions_source_config(config);
}
#[test]
fn fixed_list_of_partitions() {
let config = PartitionSourceConfigForLocalScheduler {
compaction_partition_minute_threshold: 10,
partition_filter: Some(vec![1, 7]),
process_all_partitions: false,
};
let partitions_source_config = convert_partitions_source_config(config);
assert_eq!(
partitions_source_config,
PartitionsSourceConfig::Fixed([PartitionId::new(1), PartitionId::new(7)].into())
);
}
#[test]
fn all_in_the_catalog() {
let config = PartitionSourceConfigForLocalScheduler {
compaction_partition_minute_threshold: 10,
partition_filter: None,
process_all_partitions: true,
};
let partitions_source_config = convert_partitions_source_config(config);
assert_eq!(partitions_source_config, PartitionsSourceConfig::CatalogAll,);
}
#[test]
fn normal_compaction() {
let config = PartitionSourceConfigForLocalScheduler {
compaction_partition_minute_threshold: 10,
partition_filter: None,
process_all_partitions: false,
};
let partitions_source_config = convert_partitions_source_config(config);
assert_eq!(
partitions_source_config,
PartitionsSourceConfig::CatalogRecentWrites {
threshold: Duration::from_secs(600)
},
);
}
}