diff --git a/compactor/src/compactor.rs b/compactor/src/compactor.rs index 72063e7a4b..c9ec29e0dc 100644 --- a/compactor/src/compactor.rs +++ b/compactor/src/compactor.rs @@ -55,12 +55,12 @@ impl Compactor { let df_semaphore = Arc::new(semaphore_metrics.new_semaphore(config.df_concurrency.get())); // Initialise the gossip subsystem, if configured. - let _gossip = match config.gossip_bind_address { + let gossip = match config.gossip_bind_address { Some(bind) => { // Initialise the gossip subsystem. let handle = gossip::Builder::<_, Topic>::new( config.gossip_seeds, - NopDispatcher::default(), + NopDispatcher, Arc::clone(&config.metric_registry), ) // Configure the compactor to subscribe to no topics - it @@ -73,7 +73,7 @@ impl Compactor { let event_tx = gossip_compaction::tx::CompactionEventTx::::new(handle); - Some(event_tx) + Some(Arc::new(event_tx)) } None => None, }; @@ -87,7 +87,8 @@ impl Compactor { config.partition_concurrency, config.partition_timeout, Arc::clone(&df_semaphore), - &components + &components, + gossip, ).await; info!("compactor done"); diff --git a/compactor/src/driver.rs b/compactor/src/driver.rs index f1df778516..6d4ff38c66 100644 --- a/compactor/src/driver.rs +++ b/compactor/src/driver.rs @@ -4,6 +4,7 @@ use chrono::Utc; use compactor_scheduler::CompactionJob; use data_types::{CompactionLevel, ParquetFile, ParquetFileParams, PartitionId}; use futures::{stream, StreamExt, TryStreamExt}; +use gossip_compaction::tx::CompactionEventTx; use iox_query::exec::query_tracing::send_metrics_to_tracing; use observability_deps::tracing::info; use parquet_file::ParquetFilePath; @@ -33,6 +34,7 @@ pub async fn compact( partition_timeout: Duration, df_semaphore: Arc, components: &Arc, + gossip_handle: Option>, ) { components .compaction_job_stream @@ -54,6 +56,7 @@ pub async fn compact( partition_timeout, Arc::clone(&df_semaphore), components, + gossip_handle.clone(), ) }) .buffer_unordered(partition_concurrency.get()) @@ -67,6 +70,7 @@ async fn compact_partition( partition_timeout: Duration, df_semaphore: Arc, components: Arc, + gossip_handle: Option>, ) { let partition_id = job.partition_id; info!(partition_id = partition_id.get(), timeout = ?partition_timeout, "compact partition",); @@ -86,6 +90,7 @@ async fn compact_partition( components, scratchpad, transmit_progress_signal, + gossip_handle, ) .await // errors detected in the CompactionJob update_job_status(), will be handled in the timeout_with_progress_checking } @@ -210,6 +215,7 @@ async fn try_compact_partition( components: Arc, scratchpad_ctx: Arc, transmit_progress_signal: Sender, + gossip_handle: Option>, ) -> Result<(), DynError> { let partition_id = job.partition_id; let mut files = components.partition_files_source.fetch(partition_id).await; @@ -273,6 +279,7 @@ async fn try_compact_partition( let job = job.clone(); let branch_span = round_span.child("branch"); let round_info = round_info.clone(); + let gossip_handle = gossip_handle.clone(); async move { execute_branch( @@ -285,6 +292,7 @@ async fn try_compact_partition( partition_info, round_info, transmit_progress_signal, + gossip_handle, ) .await } @@ -310,6 +318,7 @@ async fn execute_branch( partition_info: Arc, round_info: RoundInfo, transmit_progress_signal: Arc>, + gossip_handle: Option>, ) -> Result, DynError> { let files_next: Vec = Vec::new(); @@ -377,7 +386,7 @@ async fn execute_branch( let files_to_delete = chunk .iter() .flat_map(|plan| plan.input_parquet_files()) - .collect(); + .collect::>(); // Compact & Split let created_file_params = run_plans( @@ -424,13 +433,22 @@ async fn execute_branch( Arc::clone(&components), job.clone(), &saved_parquet_file_state, - files_to_delete, + &files_to_delete, upgrade, created_file_params, target_level, ) .await?; + // Broadcast the compaction event to gossip peers. + gossip_compaction_complete( + gossip_handle.as_deref(), + &created_files, + &upgraded_files, + files_to_delete, + target_level, + ); + // we only need to upgrade files on the first iteration, so empty the upgrade list for next loop. upgrade = Vec::new(); @@ -450,6 +468,36 @@ async fn execute_branch( Ok(files_next) } +/// Broadcast a compaction completion event over gossip. +fn gossip_compaction_complete( + gossip_handle: Option<&CompactionEventTx>, + created_files: &[ParquetFile], + upgraded_files: &[ParquetFile], + deleted_files: Vec, + target_level: CompactionLevel, +) { + use generated_types::influxdata::iox::catalog::v1 as catalog_proto; + use generated_types::influxdata::iox::gossip::v1 as proto; + + let handle = match gossip_handle { + Some(v) => v, + None => return, + }; + + let new_files = created_files + .iter() + .cloned() + .map(catalog_proto::ParquetFile::from) + .collect::>(); + + handle.broadcast(proto::CompactionEvent { + new_files, + updated_file_ids: upgraded_files.iter().map(|v| v.id.get()).collect(), + deleted_file_ids: deleted_files.iter().map(|v| v.id.get()).collect(), + upgraded_target_level: target_level as _, + }); +} + /// Compact or split given files async fn run_plans( span: SpanRecorder, @@ -628,7 +676,7 @@ async fn update_catalog( components: Arc, job: CompactionJob, saved_parquet_file_state: &SavedParquetFileState, - files_to_delete: Vec, + files_to_delete: &[ParquetFile], files_to_upgrade: Vec, file_params_to_create: Vec, target_level: CompactionLevel, @@ -646,7 +694,7 @@ async fn update_catalog( .commit .commit( job, - &files_to_delete, + files_to_delete, &files_to_upgrade, &file_params_to_create, target_level, diff --git a/compactor_test_utils/src/lib.rs b/compactor_test_utils/src/lib.rs index efa5da2cf0..ad59192e82 100644 --- a/compactor_test_utils/src/lib.rs +++ b/compactor_test_utils/src/lib.rs @@ -774,6 +774,7 @@ impl TestSetup { config.partition_timeout, df_semaphore, &components, + None, ) .await; diff --git a/gossip_compaction/src/tx.rs b/gossip_compaction/src/tx.rs index a15deeaae0..2f2f43e767 100644 --- a/gossip_compaction/src/tx.rs +++ b/gossip_compaction/src/tx.rs @@ -26,7 +26,7 @@ use tokio::{ /// decoupling the caller from the latency of processing each frame. Dropping /// the [`CompactionEventTx`] stops this background actor task. #[derive(Debug)] -pub struct CompactionEventTx { +pub struct CompactionEventTx { tx: mpsc::Sender, task: JoinHandle<()>, } diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 21b1c010e0..1f578afa67 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -522,6 +522,7 @@ impl Config { max_num_columns_per_table: 200, max_num_files_per_plan: 200, max_partition_fetch_queries_per_second: Some(500), + gossip_config: GossipConfig::disabled(), }; let querier_config = QuerierConfig { diff --git a/ioxd_compactor/src/lib.rs b/ioxd_compactor/src/lib.rs index 337e4d50c7..5b4e895839 100644 --- a/ioxd_compactor/src/lib.rs +++ b/ioxd_compactor/src/lib.rs @@ -190,7 +190,10 @@ pub async fn create_compactor_server_type( max_partition_fetch_queries_per_second: compactor_config .max_partition_fetch_queries_per_second, gossip_seeds: compactor_config.gossip_config.seed_list, - gossip_bind_address: compactor_config.gossip_config.gossip_bind_address, + gossip_bind_address: compactor_config + .gossip_config + .gossip_bind_address + .map(Into::into), }) .await;