refactor: move PartitionsSourceConfig into local scheduler (#8026)

pull/24376/head
wiedld 2023-06-20 16:05:59 -07:00 committed by GitHub
parent 34b5fadde0
commit e29b453e0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 156 additions and 142 deletions

2
Cargo.lock generated
View File

@ -1018,6 +1018,7 @@ dependencies = [
"async-trait",
"backoff",
"compactor",
"compactor_scheduler",
"data_types",
"datafusion",
"datafusion_util",
@ -2978,7 +2979,6 @@ dependencies = [
"clap_blocks",
"compactor",
"compactor_scheduler",
"data_types",
"hyper",
"iox_catalog",
"iox_query",

View File

@ -4,16 +4,12 @@
use std::{sync::Arc, time::Duration};
use compactor_scheduler::{MockPartitionsSource, PartitionsSource};
use compactor_scheduler::{MockPartitionsSource, PartitionsSource, PartitionsSourceConfig};
use data_types::CompactionLevel;
use object_store::memory::InMemory;
use observability_deps::tracing::info;
use crate::{
config::{Config, PartitionsSourceConfig},
error::ErrorKind,
object_store::ignore_writes::IgnoreWrites,
};
use crate::{config::Config, error::ErrorKind, object_store::ignore_writes::IgnoreWrites};
use super::{
changed_files_filter::logging::LoggingChangedFiles,

View File

@ -1,9 +1,8 @@
//! Config-related stuff.
use std::{collections::HashSet, fmt::Display, num::NonZeroUsize, sync::Arc, time::Duration};
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use backoff::BackoffConfig;
use compactor_scheduler::ShardConfig;
use data_types::PartitionId;
use compactor_scheduler::{PartitionsSourceConfig, ShardConfig};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use iox_time::TimeProvider;
@ -150,40 +149,3 @@ impl Config {
self.max_desired_file_size_bytes as usize * MIN_COMPACT_SIZE_MULTIPLE
}
}
/// Partitions source config.
#[derive(Debug, Clone, PartialEq)]
pub enum PartitionsSourceConfig {
/// For "hot" compaction: use the catalog to determine which partitions have recently received
/// writes, defined as having a new Parquet file created within the last `threshold`.
CatalogRecentWrites {
/// The amount of time ago to look for Parquet file creations
threshold: Duration,
},
/// Use all partitions from the catalog.
///
/// This does NOT consider if/when a partition received any writes.
CatalogAll,
/// Use a fixed set of partitions.
///
/// This is mostly useful for debugging.
Fixed(HashSet<PartitionId>),
}
impl Display for PartitionsSourceConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::CatalogRecentWrites { threshold } => {
write!(f, "catalog_recent_writes({threshold:?})")
}
Self::CatalogAll => write!(f, "catalog_all"),
Self::Fixed(p_ids) => {
let mut p_ids = p_ids.iter().copied().collect::<Vec<_>>();
p_ids.sort();
write!(f, "fixed({p_ids:?})")
}
}
}
}

View File

@ -25,5 +25,7 @@ mod local_scheduler;
mod partitions_source;
pub use partitions_source::*;
// Temporary export. Will eventually be encapsulated in local_scheduler.
pub use local_scheduler::shard_config::ShardConfig;
// Temporary exports. Will eventually be encapsulated in local_scheduler.
pub use local_scheduler::{
partitions_source_config::PartitionsSourceConfig, shard_config::ShardConfig,
};

View File

@ -1,4 +1,5 @@
//! Internals used by [`LocalScheduler`].
pub(crate) mod partitions_source_config;
pub(crate) mod shard_config;
#[allow(dead_code)]

View File

@ -0,0 +1,138 @@
use std::{collections::HashSet, fmt::Display, time::Duration};
use clap_blocks::compactor_scheduler::PartitionSourceConfigForLocalScheduler;
use data_types::PartitionId;
/// Default threshold for hot partitions
const DEFAULT_PARTITION_MINUTE_THRESHOLD: u64 = 10;
/// Partitions source config.
#[derive(Debug, Clone, PartialEq)]
pub enum PartitionsSourceConfig {
/// For "hot" compaction: use the catalog to determine which partitions have recently received
/// writes, defined as having a new Parquet file created within the last `threshold`.
CatalogRecentWrites {
/// The amount of time ago to look for Parquet file creations
threshold: Duration,
},
/// Use all partitions from the catalog.
///
/// This does NOT consider if/when a partition received any writes.
CatalogAll,
/// Use a fixed set of partitions.
///
/// This is mostly useful for debugging.
Fixed(HashSet<PartitionId>),
}
impl Display for PartitionsSourceConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::CatalogRecentWrites { threshold } => {
write!(f, "catalog_recent_writes({threshold:?})")
}
Self::CatalogAll => write!(f, "catalog_all"),
Self::Fixed(p_ids) => {
let mut p_ids = p_ids.iter().copied().collect::<Vec<_>>();
p_ids.sort();
write!(f, "fixed({p_ids:?})")
}
}
}
}
impl Default for PartitionsSourceConfig {
fn default() -> Self {
Self::CatalogRecentWrites {
threshold: Duration::from_secs(DEFAULT_PARTITION_MINUTE_THRESHOLD * 60),
}
}
}
impl PartitionsSourceConfig {
/// Create a new [`PartitionsSourceConfig`] from the CLI|env config.
pub fn from_config(config: PartitionSourceConfigForLocalScheduler) -> PartitionsSourceConfig {
let PartitionSourceConfigForLocalScheduler {
partition_filter,
process_all_partitions,
compaction_partition_minute_threshold,
} = config;
match (partition_filter, process_all_partitions) {
(None, false) => PartitionsSourceConfig::CatalogRecentWrites {
threshold: Duration::from_secs(compaction_partition_minute_threshold * 60),
},
(None, true) => PartitionsSourceConfig::CatalogAll,
(Some(ids), false) => {
PartitionsSourceConfig::Fixed(ids.iter().cloned().map(PartitionId::new).collect())
}
(Some(_), true) => panic!(
"provided partition ID filter and specific 'process all', this does not make sense"
),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[should_panic(
expected = "provided partition ID filter and specific 'process all', this does not make sense"
)]
fn process_all_and_partition_filter_incompatible() {
let config = PartitionSourceConfigForLocalScheduler {
compaction_partition_minute_threshold: 10,
partition_filter: Some(vec![1, 7]),
process_all_partitions: true,
};
PartitionsSourceConfig::from_config(config);
}
#[test]
fn fixed_list_of_partitions() {
let config = PartitionSourceConfigForLocalScheduler {
compaction_partition_minute_threshold: 10,
partition_filter: Some(vec![1, 7]),
process_all_partitions: false,
};
let partitions_source_config = PartitionsSourceConfig::from_config(config);
assert_eq!(
partitions_source_config,
PartitionsSourceConfig::Fixed([PartitionId::new(1), PartitionId::new(7)].into())
);
}
#[test]
fn all_in_the_catalog() {
let config = PartitionSourceConfigForLocalScheduler {
compaction_partition_minute_threshold: 10,
partition_filter: None,
process_all_partitions: true,
};
let partitions_source_config = PartitionsSourceConfig::from_config(config);
assert_eq!(partitions_source_config, PartitionsSourceConfig::CatalogAll,);
}
#[test]
fn normal_compaction() {
let config = PartitionSourceConfigForLocalScheduler {
compaction_partition_minute_threshold: 10,
partition_filter: None,
process_all_partitions: false,
};
let partitions_source_config = PartitionsSourceConfig::from_config(config);
assert_eq!(
partitions_source_config,
PartitionsSourceConfig::CatalogRecentWrites {
threshold: Duration::from_secs(600)
},
);
}
}

View File

@ -10,6 +10,7 @@ license.workspace = true
async-trait = "0.1.68"
backoff = { path = "../backoff" }
compactor = { path = "../compactor" }
compactor_scheduler = { path = "../compactor_scheduler" }
data_types = { path = "../data_types" }
datafusion = { workspace = true }
datafusion_util = { path = "../datafusion_util" }

View File

@ -38,10 +38,10 @@ use crate::{
use async_trait::async_trait;
use backoff::BackoffConfig;
use compactor::{
compact,
config::{Config, PartitionsSourceConfig},
hardcoded_components, Components, PanicDataFusionPlanner, PartitionInfo,
compact, config::Config, hardcoded_components, Components, PanicDataFusionPlanner,
PartitionInfo,
};
use compactor_scheduler::PartitionsSourceConfig;
use data_types::{ColumnType, CompactionLevel, ParquetFile, TableId};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion_util::config::register_iox_object_store;

View File

@ -11,7 +11,6 @@ backoff = { path = "../backoff" }
clap_blocks = { path = "../clap_blocks" }
compactor = { path = "../compactor" }
compactor_scheduler = { path = "../compactor_scheduler" }
data_types = { path = "../data_types" }
hyper = "0.14"
iox_catalog = { path = "../iox_catalog" }
iox_time = { path = "../iox_time" }

View File

@ -17,12 +17,8 @@ use workspace_hack as _;
use async_trait::async_trait;
use backoff::BackoffConfig;
use clap_blocks::{compactor::CompactorConfig, compactor_scheduler::CompactorSchedulerConfig};
use compactor::{
compactor::Compactor,
config::{Config, PartitionsSourceConfig},
};
use compactor_scheduler::ShardConfig;
use data_types::PartitionId;
use compactor::{compactor::Compactor, config::Config};
use compactor_scheduler::{PartitionsSourceConfig, ShardConfig};
use hyper::{Body, Request, Response};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
@ -164,18 +160,8 @@ pub async fn create_compactor_server_type(
let shard_config = ShardConfig::from_config(compactor_scheduler_config.shard_config);
let partitions_source = create_partition_source_config(
compactor_scheduler_config
.partition_source_config
.partition_filter
.as_deref(),
compactor_scheduler_config
.partition_source_config
.process_all_partitions,
compactor_scheduler_config
.partition_source_config
.compaction_partition_minute_threshold,
);
let partitions_source =
PartitionsSourceConfig::from_config(compactor_scheduler_config.partition_source_config);
let compactor = Compactor::start(Config {
metric_registry: Arc::clone(&metric_registry),
@ -214,74 +200,3 @@ pub async fn create_compactor_server_type(
common_state,
))
}
fn create_partition_source_config(
partition_filter: Option<&[i64]>,
process_all_partitions: bool,
compaction_partition_minute_threshold: u64,
) -> PartitionsSourceConfig {
match (partition_filter, process_all_partitions) {
(None, false) => PartitionsSourceConfig::CatalogRecentWrites {
threshold: Duration::from_secs(compaction_partition_minute_threshold * 60),
},
(None, true) => PartitionsSourceConfig::CatalogAll,
(Some(ids), false) => {
PartitionsSourceConfig::Fixed(ids.iter().cloned().map(PartitionId::new).collect())
}
(Some(_), true) => panic!(
"provided partition ID filter and specific 'process all', this does not make sense"
),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[should_panic(
expected = "provided partition ID filter and specific 'process all', this does not make sense"
)]
fn process_all_and_partition_filter_incompatible() {
create_partition_source_config(
Some(&[1, 7]),
true,
10, // arbitrary
);
}
#[test]
fn fixed_list_of_partitions() {
let partitions_source_config = create_partition_source_config(
Some(&[1, 7]),
false,
10, // arbitrary
);
assert_eq!(
partitions_source_config,
PartitionsSourceConfig::Fixed([PartitionId::new(1), PartitionId::new(7)].into())
);
}
#[test]
fn all_in_the_catalog() {
let partitions_source_config = create_partition_source_config(
None, true, 10, // arbitrary
);
assert_eq!(partitions_source_config, PartitionsSourceConfig::CatalogAll,);
}
#[test]
fn hot_compaction() {
let partitions_source_config = create_partition_source_config(None, false, 10);
assert_eq!(
partitions_source_config,
PartitionsSourceConfig::CatalogRecentWrites {
threshold: Duration::from_secs(600)
},
);
}
}