Merge pull request #7530 from influxdata/cn/cold-vs-hot-cli

feat: Cold compaction CLI that only selects partitions and logs them
pull/24376/head
kodiakhq[bot] 2023-04-19 16:40:50 +00:00 committed by GitHub
commit 63b51fdd50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 491 additions and 73 deletions

View File

@ -2,9 +2,50 @@
use std::num::NonZeroUsize;
/// Compaction type.
#[derive(Debug, Default, Clone, Copy, PartialEq, clap::ValueEnum)]
pub enum CompactionType {
/// Compacts recent writes as they come in.
#[default]
Hot,
/// Compacts partitions that have not been written to very recently for longer-term storage.
Cold,
}
/// CLI config for compactor2
#[derive(Debug, Clone, clap::Parser)]
pub struct Compactor2Config {
/// Type of compaction to perform.
#[clap(
value_enum,
long = "compaction-type",
env = "INFLUXDB_IOX_COMPACTION_TYPE",
default_value = "hot",
action
)]
pub compaction_type: CompactionType,
/// When in "hot" compaction mode, the compactor will only consider compacting partitions that
/// have new Parquet files created within this many minutes.
#[clap(
long = "compaction_partition_minute_threshold",
env = "INFLUXDB_IOX_COMPACTION_PARTITION_MINUTE_THRESHOLD",
default_value = "10",
action
)]
pub compaction_partition_minute_threshold: u64,
/// When in "cold" compaction mode, the compactor will only consider compacting partitions that
/// have had no new Parquet files created in at least this many minutes.
#[clap(
long = "compaction_cold_partition_minute_threshold",
env = "INFLUXDB_IOX_COMPACTION_COLD_PARTITION_MINUTE_THRESHOLD",
default_value = "60",
action
)]
pub compaction_cold_partition_minute_threshold: u64,
/// Number of partitions that should be compacted in parallel.
///
/// This should usually be larger than the compaction job
@ -40,16 +81,6 @@ pub struct Compactor2Config {
)]
pub compaction_partition_scratchpad_concurrency: NonZeroUsize,
/// The compactor will only consider compacting partitions that
/// have new parquet files created within this many minutes.
#[clap(
long = "compaction_partition_minute_threshold",
env = "INFLUXDB_IOX_COMPACTION_PARTITION_MINUTE_THRESHOLD",
default_value = "10",
action
)]
pub compaction_partition_minute_threshold: u64,
/// Number of threads to use for the compactor query execution,
/// compaction and persistence.
/// If not specified, defaults to one less than the number of cores on the system
@ -274,3 +305,42 @@ pub struct Compactor2Config {
)]
pub max_num_columns_per_table: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use clap::Parser;
use test_helpers::assert_contains;
#[test]
fn default_compaction_type_is_hot() {
let config = Compactor2Config::try_parse_from(["my_binary"]).unwrap();
assert_eq!(config.compaction_type, CompactionType::Hot);
}
#[test]
fn can_specify_hot() {
let config =
Compactor2Config::try_parse_from(["my_binary", "--compaction-type", "hot"]).unwrap();
assert_eq!(config.compaction_type, CompactionType::Hot);
}
#[test]
fn can_specify_cold() {
let config =
Compactor2Config::try_parse_from(["my_binary", "--compaction-type", "cold"]).unwrap();
assert_eq!(config.compaction_type, CompactionType::Cold);
}
#[test]
fn any_other_compaction_type_string_is_invalid() {
let error = Compactor2Config::try_parse_from(["my_binary", "--compaction-type", "hello"])
.unwrap_err()
.to_string();
assert_contains!(
&error,
"invalid value 'hello' for '--compaction-type <COMPACTION_TYPE>'"
);
assert_contains!(&error, "[possible values: hot, cold]");
}
}

View File

@ -8,7 +8,7 @@ use data_types::CompactionLevel;
use object_store::memory::InMemory;
use crate::{
config::{Config, PartitionsSourceConfig},
config::{CompactionType, Config, PartitionsSourceConfig},
error::ErrorKind,
object_store::ignore_writes::IgnoreWrites,
};
@ -125,12 +125,22 @@ fn make_partitions_source_commit_partition_sink(
Arc<dyn PartitionDoneSink>,
) {
let partitions_source: Arc<dyn PartitionsSource> = match &config.partitions_source {
PartitionsSourceConfig::CatalogRecentWrites => {
PartitionsSourceConfig::CatalogRecentWrites { threshold } => {
Arc::new(CatalogToCompactPartitionsSource::new(
config.backoff_config.clone(),
Arc::clone(&config.catalog),
config.partition_threshold,
None, // Recent writes is `partition_threshold` ago to now
*threshold,
None, // Recent writes is `threshold` ago to now
Arc::clone(&config.time_provider),
))
}
PartitionsSourceConfig::CatalogColdForWrites { threshold } => {
Arc::new(CatalogToCompactPartitionsSource::new(
config.backoff_config.clone(),
Arc::clone(&config.catalog),
// Cold for writes is `threshold * 3` ago to `threshold` ago
*threshold * 3,
Some(*threshold),
Arc::clone(&config.time_provider),
))
}
@ -156,7 +166,10 @@ fn make_partitions_source_commit_partition_sink(
partitions_source,
);
let partition_done_sink: Arc<dyn PartitionDoneSink> = if config.shadow_mode {
// Temporarily do nothing for cold compaction until we check the cold compaction selection.
let shadow_mode = config.shadow_mode || config.compaction_type == CompactionType::Cold;
let partition_done_sink: Arc<dyn PartitionDoneSink> = if shadow_mode {
Arc::new(MockPartitionDoneSink::new())
} else {
Arc::new(CatalogPartitionDoneSink::new(
@ -165,7 +178,7 @@ fn make_partitions_source_commit_partition_sink(
))
};
let commit: Arc<dyn Commit> = if config.shadow_mode {
let commit: Arc<dyn Commit> = if shadow_mode {
Arc::new(MockCommit::new())
} else {
Arc::new(CatalogCommit::new(
@ -219,15 +232,18 @@ fn make_partitions_source_commit_partition_sink(
MetricsPartitionDoneSinkWrapper::new(partition_done_sink, &config.metric_registry),
));
// Note: Place "not empty" wrapper at the very last so that the logging and metric wrapper work even when there
// is not data.
let partitions_source =
LoggingPartitionsSourceWrapper::new(MetricsPartitionsSourceWrapper::new(
// Note: Place "not empty" wrapper at the very last so that the logging and metric wrapper work
// even when there is not data.
let partitions_source = LoggingPartitionsSourceWrapper::new(
config.compaction_type,
MetricsPartitionsSourceWrapper::new(
RandomizeOrderPartitionsSourcesWrapper::new(partitions_source, 1234),
&config.metric_registry,
));
),
);
let partitions_source: Arc<dyn PartitionsSource> = if config.process_once {
// do not wrap into the "not empty" filter because we do NOT wanna throttle in this case but just exit early
// do not wrap into the "not empty" filter because we do NOT wanna throttle in this case
// but just exit early
Arc::new(partitions_source)
} else {
Arc::new(NotEmptyPartitionsSourceWrapper::new(

View File

@ -5,12 +5,14 @@ use data_types::PartitionId;
use observability_deps::tracing::{info, warn};
use super::PartitionsSource;
use crate::config::CompactionType;
#[derive(Debug)]
pub struct LoggingPartitionsSourceWrapper<T>
where
T: PartitionsSource,
{
compaction_type: CompactionType,
inner: T,
}
@ -18,8 +20,11 @@ impl<T> LoggingPartitionsSourceWrapper<T>
where
T: PartitionsSource,
{
pub fn new(inner: T) -> Self {
Self { inner }
pub fn new(compaction_type: CompactionType, inner: T) -> Self {
Self {
compaction_type,
inner,
}
}
}
@ -39,9 +44,13 @@ where
{
async fn fetch(&self) -> Vec<PartitionId> {
let partitions = self.inner.fetch().await;
info!(n_partitions = partitions.len(), "Fetch partitions",);
info!(
compaction_type = ?self.compaction_type,
n_partitions = partitions.len(),
"Fetch partitions",
);
if partitions.is_empty() {
warn!("No partition found",);
warn!(compaction_type = ?self.compaction_type, "No partition found",);
}
partitions
}
@ -57,37 +66,66 @@ mod tests {
#[test]
fn test_display() {
let source = LoggingPartitionsSourceWrapper::new(MockPartitionsSource::new(vec![]));
let source = LoggingPartitionsSourceWrapper::new(
CompactionType::Hot,
MockPartitionsSource::new(vec![]),
);
assert_eq!(source.to_string(), "logging(mock)",);
}
#[tokio::test]
async fn test_fetch_empty() {
let source = LoggingPartitionsSourceWrapper::new(MockPartitionsSource::new(vec![]));
let source = LoggingPartitionsSourceWrapper::new(
CompactionType::Hot,
MockPartitionsSource::new(vec![]),
);
let capture = TracingCapture::new();
assert_eq!(source.fetch().await, vec![],);
// logs normal log message (so it's easy search for every single call) but also an extra warning
assert_eq!(
capture.to_string(),
"level = INFO; message = Fetch partitions; n_partitions = 0; \nlevel = WARN; message = No partition found; ",
"level = INFO; message = Fetch partitions; compaction_type = Hot; n_partitions = 0; \
\nlevel = WARN; message = No partition found; compaction_type = Hot; ",
);
}
#[tokio::test]
async fn test_fetch_some() {
async fn test_fetch_some_hot() {
let p_1 = PartitionId::new(5);
let p_2 = PartitionId::new(1);
let p_3 = PartitionId::new(12);
let partitions = vec![p_1, p_2, p_3];
let source =
LoggingPartitionsSourceWrapper::new(MockPartitionsSource::new(partitions.clone()));
let source = LoggingPartitionsSourceWrapper::new(
CompactionType::Hot,
MockPartitionsSource::new(partitions.clone()),
);
let capture = TracingCapture::new();
assert_eq!(source.fetch().await, partitions,);
// just the ordinary log message, no warning
assert_eq!(
capture.to_string(),
"level = INFO; message = Fetch partitions; n_partitions = 3; ",
"level = INFO; message = Fetch partitions; compaction_type = Hot; n_partitions = 3; ",
);
}
#[tokio::test]
async fn test_fetch_some_cold() {
let p_1 = PartitionId::new(5);
let p_2 = PartitionId::new(1);
let p_3 = PartitionId::new(12);
let partitions = vec![p_1, p_2, p_3];
let source = LoggingPartitionsSourceWrapper::new(
CompactionType::Cold,
MockPartitionsSource::new(partitions.clone()),
);
let capture = TracingCapture::new();
assert_eq!(source.fetch().await, partitions,);
// just the ordinary log message, no warning
assert_eq!(
capture.to_string(),
"level = INFO; message = Fetch partitions; compaction_type = Cold; n_partitions = 3; ",
);
}
}

View File

@ -10,6 +10,7 @@ use super::Components;
pub fn log_config(config: &Config) {
// use struct unpack so we don't forget any members
let Config {
compaction_type,
shard_id,
// no need to print the internal state of the registry
metric_registry: _,
@ -22,7 +23,6 @@ pub fn log_config(config: &Config) {
partition_concurrency,
job_concurrency,
partition_scratchpad_concurrency,
partition_threshold,
max_desired_file_size_bytes,
percentage_max_file_size,
split_percentage,
@ -58,6 +58,7 @@ pub fn log_config(config: &Config) {
let commit_wrapper = commit_wrapper.as_ref().map(|_| "Some").unwrap_or("None");
info!(
?compaction_type,
shard_id=shard_id.get(),
%catalog,
%parquet_store_real,
@ -68,7 +69,6 @@ pub fn log_config(config: &Config) {
partition_concurrency=partition_concurrency.get(),
job_concurrency=job_concurrency.get(),
partition_scratchpad_concurrency=partition_scratchpad_concurrency.get(),
partition_threshold_secs=partition_threshold.as_secs_f32(),
max_desired_file_size_bytes,
percentage_max_file_size,
split_percentage,

View File

@ -19,6 +19,9 @@ const MIN_COMPACT_SIZE_MULTIPLE: usize = 3;
/// Config to set up a compactor.
#[derive(Debug, Clone)]
pub struct Config {
/// Compaction type.
pub compaction_type: CompactionType,
/// Shard Id
pub shard_id: ShardId,
@ -58,9 +61,6 @@ pub struct Config {
/// Number of jobs PER PARTITION that move files in and out of the scratchpad.
pub partition_scratchpad_concurrency: NonZeroUsize,
/// Partitions with recent created files this recent duration are selected for compaction.
pub partition_threshold: Duration,
/// Desired max size of compacted parquet files
/// It is a target desired value than a guarantee
pub max_desired_file_size_bytes: u64,
@ -210,11 +210,34 @@ pub struct ShardConfig {
pub shard_id: usize,
}
/// Compaction type.
#[derive(Debug, Default, Clone, Copy, PartialEq)]
pub enum CompactionType {
/// Compacts recent writes as they come in.
#[default]
Hot,
/// Compacts partitions that have not been written to very recently for longer-term storage.
Cold,
}
/// Partitions source config.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub enum PartitionsSourceConfig {
/// Use the catalog to determine which partitions have recently received writes.
CatalogRecentWrites,
/// For "hot" compaction: use the catalog to determine which partitions have recently received
/// writes, defined as having a new Parquet file created within the last `threshold`.
CatalogRecentWrites {
/// The amount of time ago to look for Parquet file creations
threshold: Duration,
},
/// For "cold" compaction: use the catalog to determine which partitions have gone cold for
/// writing and should undergo final compaction, defined as having no new Parquet files created
/// in at least the last `threshold`.
CatalogColdForWrites {
/// The amount of time ago the last Parquet file creation must have happened
threshold: Duration,
},
/// Use all partitions from the catalog.
///
@ -230,7 +253,12 @@ pub enum PartitionsSourceConfig {
impl Display for PartitionsSourceConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::CatalogRecentWrites => write!(f, "catalog_recent_writes"),
Self::CatalogRecentWrites { threshold } => {
write!(f, "catalog_recent_writes({threshold:?})")
}
Self::CatalogColdForWrites { threshold } => {
write!(f, "catalog_cold_for_writes({threshold:?})")
}
Self::CatalogAll => write!(f, "catalog_all"),
Self::Fixed(p_ids) => {
let mut p_ids = p_ids.iter().copied().collect::<Vec<_>>();

View File

@ -1,7 +1,8 @@
use arrow_util::assert_batches_sorted_eq;
use compactor2_test_utils::{format_files, list_object_store, TestSetup, TestSetupBuilder};
use data_types::{CompactionLevel, ParquetFile, PartitionId};
use compactor2_test_utils::{format_files, list_object_store, TestSetup};
use iox_tests::TestParquetFileBuilder;
use test_helpers::{assert_contains, tracing::TracingCapture};
mod layouts;
@ -525,6 +526,146 @@ async fn test_shadow_mode_partition_fail() {
assert_skipped_compactions(&setup, []).await;
}
/// Creates a TestSetupBuilder for cold compaction
///
/// This is intended to be as close as possible to what is configured on production systems for
/// cold compaction.
pub(crate) async fn cold_layout_setup_builder() -> TestSetupBuilder<false> {
TestSetup::builder()
.await
.for_cold_compaction()
.with_percentage_max_file_size(20)
.with_split_percentage(80)
.with_max_num_files_per_plan(200)
.with_min_num_l1_files_to_compact(10)
.with_max_desired_file_size_bytes(100 * 1024 * 1024)
.simulate_without_object_store()
}
#[tokio::test]
async fn cold_compaction_logs() {
test_helpers::maybe_start_logging();
let logging_capture = TracingCapture::new();
let setup = cold_layout_setup_builder().await.build().await;
let time_provider = setup.catalog.time_provider();
// Creating Parquet files with creation times around 2 hours ago is what makes this partition
// selected for cold compaction
let base_created_at_minutes_ago = 2 * 60;
let parquet_builder = TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::Initial)
// need some LP to generate the schema
.with_line_protocol("table,tag1=A,tag2=B,tag3=C field_int=1i 100");
setup
.partition
.create_parquet_file(
parquet_builder
.clone()
.with_min_time(4)
.with_max_time(7)
.with_compaction_level(CompactionLevel::Final)
.with_creation_time(time_provider.minutes_ago(base_created_at_minutes_ago)),
)
.await;
setup
.partition
.create_parquet_file(
parquet_builder
.clone()
.with_min_time(1)
.with_max_time(5)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_creation_time(time_provider.minutes_ago(base_created_at_minutes_ago - 1)),
)
.await;
setup
.partition
.create_parquet_file(
parquet_builder
.clone()
.with_min_time(6)
.with_max_time(10)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_creation_time(time_provider.minutes_ago(base_created_at_minutes_ago - 2)),
)
.await;
setup
.partition
.create_parquet_file(
parquet_builder
.clone()
.with_min_time(2)
.with_max_time(4)
.with_compaction_level(CompactionLevel::Initial)
.with_creation_time(time_provider.minutes_ago(base_created_at_minutes_ago - 3)),
)
.await;
// Make sure we have set up some files
let catalog_files_pre = setup.list_by_table_not_to_delete().await;
assert!(!catalog_files_pre.is_empty());
let object_store_files_pre = list_object_store(&setup.catalog.object_store).await;
assert!(!object_store_files_pre.is_empty());
setup.run_compact().await;
// Make sure we're *NOT* actually compacting anything currently, nor are we skipping
let catalog_files_post = setup.list_by_table_not_to_delete().await;
assert_eq!(catalog_files_pre, catalog_files_post);
let object_store_files_post = list_object_store(&setup.catalog.object_store).await;
assert_eq!(object_store_files_pre, object_store_files_post);
assert_skipped_compactions(&setup, []).await;
// Make sure we *ARE* logging
let logs = logging_capture.to_string();
assert_contains!(
&logs,
"level = INFO; message = Fetch partitions; compaction_type = Cold; n_partitions = 1;"
);
assert_contains!(
&logs,
"level = INFO; message = compact partition; partition_id = 1;"
);
}
#[tokio::test]
async fn cold_compaction_doesnt_find_hot_partitions() {
test_helpers::maybe_start_logging();
let logging_capture = TracingCapture::new();
let setup = cold_layout_setup_builder().await.build().await;
let setup = layouts::all_overlapping_l0_files(setup).await;
// Make sure we have set up some files
let catalog_files_pre = setup.list_by_table_not_to_delete().await;
assert!(!catalog_files_pre.is_empty());
let object_store_files_pre = list_object_store(&setup.catalog.object_store).await;
assert!(!object_store_files_pre.is_empty());
setup.run_compact().await;
// Make sure we're *NOT* actually compacting anything currently, nor are we skipping
let catalog_files_post = setup.list_by_table_not_to_delete().await;
assert_eq!(catalog_files_pre, catalog_files_post);
let object_store_files_post = list_object_store(&setup.catalog.object_store).await;
assert_eq!(object_store_files_pre, object_store_files_post);
assert_skipped_compactions(&setup, []).await;
// Make sure we *ARE* logging
let logs = logging_capture.to_string();
assert_contains!(
&logs,
"level = INFO; message = Fetch partitions; compaction_type = Cold; n_partitions = 0;"
);
assert_contains!(
&logs,
"level = WARN; message = No partition found; compaction_type = Cold;"
);
}
#[track_caller]
fn assert_levels<'a>(
files: impl IntoIterator<Item = &'a ParquetFile>,

View File

@ -15,12 +15,8 @@
mod commit_wrapper;
mod display;
mod simulator;
use commit_wrapper::{CommitRecorderBuilder, InvariantCheck};
pub use display::{display_format, display_size, format_files, format_files_split};
use iox_catalog::interface::Catalog;
use iox_query::exec::ExecutorType;
use simulator::ParquetFileSimulator;
use tracker::AsyncSemaphoreMetrics;
use std::{
collections::HashSet,
@ -30,12 +26,23 @@ use std::{
time::Duration,
};
use crate::{
commit_wrapper::{CommitRecorderBuilder, InvariantCheck},
simulator::ParquetFileSimulator,
};
use async_trait::async_trait;
use backoff::BackoffConfig;
use compactor2::{
compact,
config::{CompactionType, Config, PartitionsSourceConfig},
hardcoded_components, Components, PanicDataFusionPlanner, PartitionInfo,
};
use data_types::{ColumnType, CompactionLevel, ParquetFile, TableId, TRANSITION_SHARD_NUMBER};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion_util::config::register_iox_object_store;
use futures::TryStreamExt;
use iox_catalog::interface::Catalog;
use iox_query::exec::ExecutorType;
use iox_tests::{
ParquetFileBuilder, TestCatalog, TestNamespace, TestParquetFileBuilder, TestPartition,
TestShard, TestTable,
@ -44,12 +51,7 @@ use iox_time::{MockProvider, Time, TimeProvider};
use object_store::{path::Path, DynObjectStore};
use parquet_file::storage::{ParquetStorage, StorageId};
use schema::sort::SortKey;
use compactor2::{
compact,
config::{Config, PartitionsSourceConfig},
hardcoded_components, Components, PanicDataFusionPlanner, PartitionInfo,
};
use tracker::AsyncSemaphoreMetrics;
// Default values for the test setup builder
const SHARD_INDEX: i32 = TRANSITION_SHARD_NUMBER;
@ -119,6 +121,7 @@ impl TestSetupBuilder<false> {
.with_invariant_check(Arc::clone(&invariant_check) as _);
let config = Config {
compaction_type: Default::default(),
shard_id: shard.shard.id,
metric_registry: catalog.metric_registry(),
catalog: catalog.catalog(),
@ -133,12 +136,13 @@ impl TestSetupBuilder<false> {
partition_concurrency: NonZeroUsize::new(1).unwrap(),
job_concurrency: NonZeroUsize::new(1).unwrap(),
partition_scratchpad_concurrency: NonZeroUsize::new(1).unwrap(),
partition_threshold: PARTITION_THRESHOLD,
max_desired_file_size_bytes: MAX_DESIRE_FILE_SIZE,
percentage_max_file_size: PERCENTAGE_MAX_FILE_SIZE,
split_percentage: SPLIT_PERCENTAGE,
partition_timeout: Duration::from_secs(3_600),
partitions_source: PartitionsSourceConfig::CatalogRecentWrites,
partitions_source: PartitionsSourceConfig::CatalogRecentWrites {
threshold: PARTITION_THRESHOLD,
},
shadow_mode: false,
ignore_partition_skip_marker: false,
shard_config: None,
@ -566,6 +570,15 @@ impl<const WITH_FILES: bool> TestSetupBuilder<WITH_FILES> {
self
}
/// Set to do cold compaction
pub fn for_cold_compaction(mut self) -> Self {
self.config.compaction_type = CompactionType::Cold;
self.config.partitions_source = PartitionsSourceConfig::CatalogColdForWrites {
threshold: Duration::from_secs(60 * 60),
};
self
}
/// Create a [`TestSetup`]
pub async fn build(self) -> TestSetup {
let candidate_partition = Arc::new(PartitionInfo {

View File

@ -477,10 +477,12 @@ impl Config {
// settings from other configs. Can't use `#clap(flatten)` as the
// parameters are redundant with ingester's
let compactor_config = Compactor2Config {
compaction_type: Default::default(),
compaction_partition_minute_threshold: 10,
compaction_cold_partition_minute_threshold: 60,
compaction_partition_concurrency: NonZeroUsize::new(1).unwrap(),
compaction_job_concurrency: NonZeroUsize::new(1).unwrap(),
compaction_partition_scratchpad_concurrency: NonZeroUsize::new(1).unwrap(),
compaction_partition_minute_threshold: 10,
query_exec_thread_count: Some(NonZeroUsize::new(1).unwrap()),
exec_mem_pool_bytes,
max_desired_file_size_bytes: 30_000,

View File

@ -1,6 +1,6 @@
use async_trait::async_trait;
use backoff::BackoffConfig;
use clap_blocks::compactor2::Compactor2Config;
use clap_blocks::compactor2::{CompactionType, Compactor2Config};
use compactor2::{
compactor::Compactor2,
config::{Config, PartitionsSourceConfig, ShardConfig},
@ -156,18 +156,22 @@ pub async fn create_compactor2_server_type(
n_shards: compactor_config.shard_count.expect("just checked"),
});
let partitions_source = match (
compactor_config.partition_filter,
let partitions_source = create_partition_source_config(
compactor_config.partition_filter.as_deref(),
compactor_config.process_all_partitions,
) {
(None, false) => PartitionsSourceConfig::CatalogRecentWrites,
(None, true) => PartitionsSourceConfig::CatalogAll,
(Some(ids), false) => {
PartitionsSourceConfig::Fixed(ids.into_iter().map(PartitionId::new).collect())
}
(Some(_), true) => panic!(
"provided partition ID filter and specific 'process all', this does not make sense"
),
compactor_config.compaction_type,
compactor_config.compaction_partition_minute_threshold,
compactor_config.compaction_cold_partition_minute_threshold,
);
// This is annoying to have two types that are so similar and have to convert between them, but
// this way compactor2 doesn't have to know about clap_blocks and vice versa. It would also
// be nice to have this as a `From` trait implementation, but this crate isn't allowed because
// neither type is defined in ioxd_compactor. This feels like the right place to do the
// conversion, though.
let compaction_type = match compactor_config.compaction_type {
CompactionType::Hot => compactor2::config::CompactionType::Hot,
CompactionType::Cold => compactor2::config::CompactionType::Cold,
};
let shard_id = Config::fetch_shard_id(
@ -178,6 +182,7 @@ pub async fn create_compactor2_server_type(
)
.await;
let compactor = Compactor2::start(Config {
compaction_type,
shard_id,
metric_registry: Arc::clone(&metric_registry),
catalog,
@ -190,9 +195,6 @@ pub async fn create_compactor2_server_type(
job_concurrency: compactor_config.compaction_job_concurrency,
partition_scratchpad_concurrency: compactor_config
.compaction_partition_scratchpad_concurrency,
partition_threshold: Duration::from_secs(
compactor_config.compaction_partition_minute_threshold * 60,
),
max_desired_file_size_bytes: compactor_config.max_desired_file_size_bytes,
percentage_max_file_size: compactor_config.percentage_max_file_size,
split_percentage: compactor_config.split_percentage,
@ -217,3 +219,111 @@ pub async fn create_compactor2_server_type(
common_state,
))
}
fn create_partition_source_config(
partition_filter: Option<&[i64]>,
process_all_partitions: bool,
compaction_type: CompactionType,
compaction_partition_minute_threshold: u64,
compaction_cold_partition_minute_threshold: u64,
) -> PartitionsSourceConfig {
match (partition_filter, process_all_partitions, compaction_type) {
(None, false, CompactionType::Hot) => PartitionsSourceConfig::CatalogRecentWrites {
threshold: Duration::from_secs(compaction_partition_minute_threshold * 60),
},
(None, false, CompactionType::Cold) => PartitionsSourceConfig::CatalogColdForWrites {
threshold: Duration::from_secs(compaction_cold_partition_minute_threshold * 60),
},
(None, true, _) => PartitionsSourceConfig::CatalogAll,
(Some(ids), false, _) => {
PartitionsSourceConfig::Fixed(ids.iter().cloned().map(PartitionId::new).collect())
}
(Some(_), true, _) => panic!(
"provided partition ID filter and specific 'process all', this does not make sense"
),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[should_panic(
expected = "provided partition ID filter and specific 'process all', this does not make sense"
)]
fn process_all_and_partition_filter_incompatible() {
create_partition_source_config(
Some(&[1, 7]),
true,
CompactionType::Hot, // arbitrary
10, // arbitrary
60, // arbitrary
);
}
#[test]
fn fixed_list_of_partitions() {
let partitions_source_config = create_partition_source_config(
Some(&[1, 7]),
false,
CompactionType::Hot, // arbitrary
10, // arbitrary
60, // arbitrary
);
assert_eq!(
partitions_source_config,
PartitionsSourceConfig::Fixed([PartitionId::new(1), PartitionId::new(7)].into())
);
}
#[test]
fn all_in_the_catalog() {
let partitions_source_config = create_partition_source_config(
None,
true,
CompactionType::Hot, // arbitrary
10, // arbitrary
60, // arbitrary
);
assert_eq!(partitions_source_config, PartitionsSourceConfig::CatalogAll,);
}
#[test]
fn hot_compaction() {
let partitions_source_config = create_partition_source_config(
None,
false,
CompactionType::Hot,
10,
60, // arbitrary
);
assert_eq!(
partitions_source_config,
PartitionsSourceConfig::CatalogRecentWrites {
threshold: Duration::from_secs(600)
},
);
}
#[test]
fn cold_compaction() {
let partitions_source_config = create_partition_source_config(
None,
false,
CompactionType::Cold,
10, // arbitrary
60,
);
assert_eq!(
partitions_source_config,
PartitionsSourceConfig::CatalogColdForWrites {
threshold: Duration::from_secs(3600)
},
);
}
}