feat(compactor): gossip compaction complete events

Add post-compaction calls to send a "compaction complete" gossip event
containing the set of upgraded, deleted, and newly created parquet
files.
pull/24376/head
Dom Dwyer 2023-08-29 16:05:07 +02:00
parent 5aee376766
commit 78d40ba59a
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
6 changed files with 64 additions and 10 deletions

View File

@ -55,12 +55,12 @@ impl Compactor {
let df_semaphore = Arc::new(semaphore_metrics.new_semaphore(config.df_concurrency.get()));
// Initialise the gossip subsystem, if configured.
let _gossip = match config.gossip_bind_address {
let gossip = match config.gossip_bind_address {
Some(bind) => {
// Initialise the gossip subsystem.
let handle = gossip::Builder::<_, Topic>::new(
config.gossip_seeds,
NopDispatcher::default(),
NopDispatcher,
Arc::clone(&config.metric_registry),
)
// Configure the compactor to subscribe to no topics - it
@ -73,7 +73,7 @@ impl Compactor {
let event_tx =
gossip_compaction::tx::CompactionEventTx::<CompactionEvent>::new(handle);
Some(event_tx)
Some(Arc::new(event_tx))
}
None => None,
};
@ -87,7 +87,8 @@ impl Compactor {
config.partition_concurrency,
config.partition_timeout,
Arc::clone(&df_semaphore),
&components
&components,
gossip,
).await;
info!("compactor done");

View File

@ -4,6 +4,7 @@ use chrono::Utc;
use compactor_scheduler::CompactionJob;
use data_types::{CompactionLevel, ParquetFile, ParquetFileParams, PartitionId};
use futures::{stream, StreamExt, TryStreamExt};
use gossip_compaction::tx::CompactionEventTx;
use iox_query::exec::query_tracing::send_metrics_to_tracing;
use observability_deps::tracing::info;
use parquet_file::ParquetFilePath;
@ -33,6 +34,7 @@ pub async fn compact(
partition_timeout: Duration,
df_semaphore: Arc<InstrumentedAsyncSemaphore>,
components: &Arc<Components>,
gossip_handle: Option<Arc<CompactionEventTx>>,
) {
components
.compaction_job_stream
@ -54,6 +56,7 @@ pub async fn compact(
partition_timeout,
Arc::clone(&df_semaphore),
components,
gossip_handle.clone(),
)
})
.buffer_unordered(partition_concurrency.get())
@ -67,6 +70,7 @@ async fn compact_partition(
partition_timeout: Duration,
df_semaphore: Arc<InstrumentedAsyncSemaphore>,
components: Arc<Components>,
gossip_handle: Option<Arc<CompactionEventTx>>,
) {
let partition_id = job.partition_id;
info!(partition_id = partition_id.get(), timeout = ?partition_timeout, "compact partition",);
@ -86,6 +90,7 @@ async fn compact_partition(
components,
scratchpad,
transmit_progress_signal,
gossip_handle,
)
.await // errors detected in the CompactionJob update_job_status(), will be handled in the timeout_with_progress_checking
}
@ -210,6 +215,7 @@ async fn try_compact_partition(
components: Arc<Components>,
scratchpad_ctx: Arc<dyn Scratchpad>,
transmit_progress_signal: Sender<bool>,
gossip_handle: Option<Arc<CompactionEventTx>>,
) -> Result<(), DynError> {
let partition_id = job.partition_id;
let mut files = components.partition_files_source.fetch(partition_id).await;
@ -273,6 +279,7 @@ async fn try_compact_partition(
let job = job.clone();
let branch_span = round_span.child("branch");
let round_info = round_info.clone();
let gossip_handle = gossip_handle.clone();
async move {
execute_branch(
@ -285,6 +292,7 @@ async fn try_compact_partition(
partition_info,
round_info,
transmit_progress_signal,
gossip_handle,
)
.await
}
@ -310,6 +318,7 @@ async fn execute_branch(
partition_info: Arc<PartitionInfo>,
round_info: RoundInfo,
transmit_progress_signal: Arc<Sender<bool>>,
gossip_handle: Option<Arc<CompactionEventTx>>,
) -> Result<Vec<ParquetFile>, DynError> {
let files_next: Vec<ParquetFile> = Vec::new();
@ -377,7 +386,7 @@ async fn execute_branch(
let files_to_delete = chunk
.iter()
.flat_map(|plan| plan.input_parquet_files())
.collect();
.collect::<Vec<_>>();
// Compact & Split
let created_file_params = run_plans(
@ -424,13 +433,22 @@ async fn execute_branch(
Arc::clone(&components),
job.clone(),
&saved_parquet_file_state,
files_to_delete,
&files_to_delete,
upgrade,
created_file_params,
target_level,
)
.await?;
// Broadcast the compaction event to gossip peers.
gossip_compaction_complete(
gossip_handle.as_deref(),
&created_files,
&upgraded_files,
files_to_delete,
target_level,
);
// we only need to upgrade files on the first iteration, so empty the upgrade list for next loop.
upgrade = Vec::new();
@ -450,6 +468,36 @@ async fn execute_branch(
Ok(files_next)
}
/// Broadcast a compaction completion event over gossip.
fn gossip_compaction_complete(
gossip_handle: Option<&CompactionEventTx>,
created_files: &[ParquetFile],
upgraded_files: &[ParquetFile],
deleted_files: Vec<ParquetFile>,
target_level: CompactionLevel,
) {
use generated_types::influxdata::iox::catalog::v1 as catalog_proto;
use generated_types::influxdata::iox::gossip::v1 as proto;
let handle = match gossip_handle {
Some(v) => v,
None => return,
};
let new_files = created_files
.iter()
.cloned()
.map(catalog_proto::ParquetFile::from)
.collect::<Vec<_>>();
handle.broadcast(proto::CompactionEvent {
new_files,
updated_file_ids: upgraded_files.iter().map(|v| v.id.get()).collect(),
deleted_file_ids: deleted_files.iter().map(|v| v.id.get()).collect(),
upgraded_target_level: target_level as _,
});
}
/// Compact or split given files
async fn run_plans(
span: SpanRecorder,
@ -628,7 +676,7 @@ async fn update_catalog(
components: Arc<Components>,
job: CompactionJob,
saved_parquet_file_state: &SavedParquetFileState,
files_to_delete: Vec<ParquetFile>,
files_to_delete: &[ParquetFile],
files_to_upgrade: Vec<ParquetFile>,
file_params_to_create: Vec<ParquetFileParams>,
target_level: CompactionLevel,
@ -646,7 +694,7 @@ async fn update_catalog(
.commit
.commit(
job,
&files_to_delete,
files_to_delete,
&files_to_upgrade,
&file_params_to_create,
target_level,

View File

@ -774,6 +774,7 @@ impl TestSetup {
config.partition_timeout,
df_semaphore,
&components,
None,
)
.await;

View File

@ -26,7 +26,7 @@ use tokio::{
/// decoupling the caller from the latency of processing each frame. Dropping
/// the [`CompactionEventTx`] stops this background actor task.
#[derive(Debug)]
pub struct CompactionEventTx<T> {
pub struct CompactionEventTx<T = CompactionEvent> {
tx: mpsc::Sender<T>,
task: JoinHandle<()>,
}

View File

@ -522,6 +522,7 @@ impl Config {
max_num_columns_per_table: 200,
max_num_files_per_plan: 200,
max_partition_fetch_queries_per_second: Some(500),
gossip_config: GossipConfig::disabled(),
};
let querier_config = QuerierConfig {

View File

@ -190,7 +190,10 @@ pub async fn create_compactor_server_type(
max_partition_fetch_queries_per_second: compactor_config
.max_partition_fetch_queries_per_second,
gossip_seeds: compactor_config.gossip_config.seed_list,
gossip_bind_address: compactor_config.gossip_config.gossip_bind_address,
gossip_bind_address: compactor_config
.gossip_config
.gossip_bind_address
.map(Into::into),
})
.await;