Merge pull request #7392 from influxdata/cn/redo

fix: Only look at the current branch files' status when looking for concurrent modifications
pull/24376/head
kodiakhq[bot] 2023-03-30 20:34:05 +00:00 committed by GitHub
commit f2ae47f72e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 289 additions and 6 deletions

View File

@ -56,7 +56,12 @@ impl Compactor2 {
tokio::select! {
_ = shutdown_captured.cancelled() => {}
_ = async {
compact(config.partition_concurrency, config.partition_timeout, Arc::clone(&job_semaphore), &components).await;
compact(
config.partition_concurrency,
config.partition_timeout,
Arc::clone(&job_semaphore),
&components
).await;
info!("compactor done");
} => {}

View File

@ -0,0 +1,33 @@
use std::fmt::Display;
use super::{ChangedFilesFilter, SavedParquetFileState};
use async_trait::async_trait;
use observability_deps::tracing::info;
#[derive(Debug, Default, Copy, Clone)]
pub struct LoggingChangedFiles {}
impl LoggingChangedFiles {
pub fn new() -> Self {
Self {}
}
}
impl Display for LoggingChangedFiles {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "logging_changed_files")
}
}
#[async_trait]
impl ChangedFilesFilter for LoggingChangedFiles {
async fn apply(&self, old: &SavedParquetFileState, new: &SavedParquetFileState) -> bool {
if old.existing_files_modified(new) {
let modified_ids_and_levels = old.modified_ids_and_levels(new);
info!(?modified_ids_and_levels, "Concurrent modification detected");
}
false // we're ignoring the return value anyway for the moment
}
}

View File

@ -0,0 +1,213 @@
use std::{
collections::HashSet,
fmt::{Debug, Display},
};
use async_trait::async_trait;
use data_types::{CompactionLevel, ParquetFile, ParquetFileId};
pub mod logging;
/// Returns `true` if the files in the saved state have been changed according to the current state.
#[async_trait]
pub trait ChangedFilesFilter: Debug + Display + Send + Sync {
/// Return `true` if some other process modified the files in `old` such that they don't appear or appear with a
/// different compaction level than `new`, and thus we should stop compacting.
async fn apply(&self, old: &SavedParquetFileState, new: &SavedParquetFileState) -> bool;
}
/// Saved snapshot of a partition's Parquet files' IDs and compaction levels. Save this state at the beginning of a
/// compaction operation, then just before committing ask for the catalog state again. If the ID+compaction level pairs
/// in the initial saved state still appear in the latest catalog state (disregarding any new files that may appear in
/// the latest catalog state) we assume no other compactor instance has compacted the relevant files and this compactor
/// instance should commit its work. If any old ID+compaction level pairs are missing from the latest catalog state
/// (and thus show up in a set difference operation of `old - current`), throw away the work and do not commit as the
/// relevant Parquet files have been changed by some other process while this compactor instance was working.
#[derive(Debug, Clone)]
pub struct SavedParquetFileState {
ids_and_levels: HashSet<(ParquetFileId, CompactionLevel)>,
}
impl<'a, T> From<T> for SavedParquetFileState
where
T: IntoIterator<Item = &'a ParquetFile>,
{
fn from(parquet_files: T) -> Self {
let ids_and_levels = parquet_files
.into_iter()
.map(|pf| (pf.id, pf.compaction_level))
.collect();
Self { ids_and_levels }
}
}
impl SavedParquetFileState {
fn missing<'a>(
&'a self,
new: &'a Self,
) -> impl Iterator<Item = &'a (ParquetFileId, CompactionLevel)> {
let old = self;
old.ids_and_levels.difference(&new.ids_and_levels)
}
pub fn existing_files_modified(&self, new: &Self) -> bool {
let mut missing = self.missing(new);
// If there are any `(ParquetFileId, CompactionLevel)` pairs in `self` that are not present in `new`, that
// means some files were marked to delete or had their compaction level changed by some other process.
missing.next().is_some()
}
pub fn modified_ids_and_levels(&self, new: &Self) -> Vec<(ParquetFileId, CompactionLevel)> {
self.missing(new).cloned().collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use iox_tests::ParquetFileBuilder;
#[test]
fn saved_state_sorts_by_parquet_file_id() {
let pf_id1_level_0 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::Initial)
.build();
let pf_id2_level_2 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Final)
.build();
let pf_id3_level_1 = ParquetFileBuilder::new(3)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.build();
let saved_state_1 =
SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2, &pf_id3_level_1]);
let saved_state_2 =
SavedParquetFileState::from([&pf_id3_level_1, &pf_id1_level_0, &pf_id2_level_2]);
assert!(!saved_state_1.existing_files_modified(&saved_state_2));
assert!(saved_state_1
.modified_ids_and_levels(&saved_state_2)
.is_empty());
}
#[test]
fn both_empty_parquet_files() {
let saved_state_1 = SavedParquetFileState::from([]);
let saved_state_2 = SavedParquetFileState::from([]);
assert!(!saved_state_1.existing_files_modified(&saved_state_2));
assert!(saved_state_1
.modified_ids_and_levels(&saved_state_2)
.is_empty());
}
#[test]
fn missing_files_indicates_modifications() {
let pf_id1_level_0 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::Initial)
.build();
let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0]);
let saved_state_2 = SavedParquetFileState::from([]);
assert!(saved_state_1.existing_files_modified(&saved_state_2));
assert_eq!(
saved_state_1.modified_ids_and_levels(&saved_state_2),
&[(ParquetFileId::new(1), CompactionLevel::Initial)]
);
}
#[test]
fn disregard_new_files() {
let pf_id1_level_0 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::Initial)
.build();
// New files of any level don't affect whether the old saved state is considered modified
let pf_id2_level_2 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Final)
.build();
let pf_id3_level_1 = ParquetFileBuilder::new(3)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.build();
let pf_id4_level_0 = ParquetFileBuilder::new(4)
.with_compaction_level(CompactionLevel::Initial)
.build();
let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0]);
let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2]);
assert!(!saved_state_1.existing_files_modified(&saved_state_2));
assert!(saved_state_1
.modified_ids_and_levels(&saved_state_2)
.is_empty());
let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id3_level_1]);
assert!(!saved_state_1.existing_files_modified(&saved_state_2));
assert!(saved_state_1
.modified_ids_and_levels(&saved_state_2)
.is_empty());
let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id4_level_0]);
assert!(!saved_state_1.existing_files_modified(&saved_state_2));
assert!(saved_state_1
.modified_ids_and_levels(&saved_state_2)
.is_empty());
let saved_state_2 = SavedParquetFileState::from([
&pf_id1_level_0,
&pf_id2_level_2,
&pf_id4_level_0,
&pf_id4_level_0,
]);
assert!(!saved_state_1.existing_files_modified(&saved_state_2));
assert!(saved_state_1
.modified_ids_and_levels(&saved_state_2)
.is_empty());
}
#[test]
fn changed_compaction_level_indicates_modification() {
let pf_id1_level_0 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::Initial)
.build();
let pf_id1_level_1 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.build();
let pf_id2_level_2 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Final)
.build();
let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2]);
let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_1, &pf_id2_level_2]);
assert!(saved_state_1.existing_files_modified(&saved_state_2));
assert_eq!(
saved_state_1.modified_ids_and_levels(&saved_state_2),
&[(ParquetFileId::new(1), CompactionLevel::Initial)]
);
}
#[test]
fn same_number_of_files_different_ids_indicates_modification() {
let pf_id1_level_0 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::Initial)
.build();
let pf_id2_level_0 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Initial)
.build();
let pf_id3_level_2 = ParquetFileBuilder::new(3)
.with_compaction_level(CompactionLevel::Final)
.build();
let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id3_level_2]);
let saved_state_2 = SavedParquetFileState::from([&pf_id2_level_0, &pf_id3_level_2]);
assert!(saved_state_1.existing_files_modified(&saved_state_2));
assert_eq!(
saved_state_1.modified_ids_and_levels(&saved_state_2),
&[(ParquetFileId::new(1), CompactionLevel::Initial)]
);
}
}

View File

@ -18,6 +18,7 @@ use crate::{
};
use super::{
changed_files_filter::logging::LoggingChangedFiles,
combos::{throttle_partition::throttle_partition, unique_partitions::unique_partitions},
commit::{
catalog::CatalogCommit, logging::LoggingCommitWrapper, metrics::MetricsCommitWrapper,
@ -338,6 +339,7 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
partition_resource_limit_conditions,
),
),
changed_files_filter: Arc::new(LoggingChangedFiles::new()),
})
}

View File

@ -1,15 +1,17 @@
use std::sync::Arc;
use self::{
commit::Commit, df_plan_exec::DataFusionPlanExec, df_planner::DataFusionPlanner,
divide_initial::DivideInitial, file_classifier::FileClassifier, ir_planner::IRPlanner,
parquet_files_sink::ParquetFilesSink, partition_done_sink::PartitionDoneSink,
partition_files_source::PartitionFilesSource, partition_filter::PartitionFilter,
partition_info_source::PartitionInfoSource, partition_stream::PartitionStream,
changed_files_filter::ChangedFilesFilter, commit::Commit, df_plan_exec::DataFusionPlanExec,
df_planner::DataFusionPlanner, divide_initial::DivideInitial, file_classifier::FileClassifier,
ir_planner::IRPlanner, parquet_files_sink::ParquetFilesSink,
partition_done_sink::PartitionDoneSink, partition_files_source::PartitionFilesSource,
partition_filter::PartitionFilter, partition_info_source::PartitionInfoSource,
partition_stream::PartitionStream,
post_classification_partition_filter::PostClassificationPartitionFilter,
round_info_source::RoundInfoSource, round_split::RoundSplit, scratchpad::ScratchpadGen,
};
pub mod changed_files_filter;
pub mod combos;
pub mod commit;
pub mod df_plan_exec;
@ -78,4 +80,6 @@ pub struct Components {
pub scratchpad_gen: Arc<dyn ScratchpadGen>,
/// Classify files for each compaction branch.
pub file_classifier: Arc<dyn FileClassifier>,
/// Check for other processes modifying files.
pub changed_files_filter: Arc<dyn ChangedFilesFilter>,
}

View File

@ -110,6 +110,7 @@ pub fn log_components(components: &Components) {
divide_initial,
scratchpad_gen,
file_classifier,
changed_files_filter,
} = components;
info!(
@ -129,6 +130,7 @@ pub fn log_components(components: &Components) {
%divide_initial,
%scratchpad_gen,
%file_classifier,
%changed_files_filter,
"component setup",
);
}

View File

@ -9,6 +9,7 @@ use tracker::InstrumentedAsyncSemaphore;
use crate::{
components::{
changed_files_filter::SavedParquetFileState,
scratchpad::Scratchpad,
timeout::{timeout_with_progress_checking, TimeoutWithProgress},
Components,
@ -219,6 +220,11 @@ async fn try_compact_partition(
let mut files_next = files_later;
// loop for each "Branch"
for branch in branches {
// Keep the current state as a check to make sure this is the only compactor modifying this branch's
// files. Check that the catalog state for the files in this set is the same before committing and, if not,
// throw away the compaction work we've done.
let saved_parquet_file_state = SavedParquetFileState::from(&branch);
let input_paths: Vec<ParquetFilePath> =
branch.iter().map(ParquetFilePath::from).collect();
@ -276,6 +282,7 @@ async fn try_compact_partition(
let (created_files, upgraded_files) = update_catalog(
Arc::clone(&components),
partition_id,
saved_parquet_file_state,
files_to_delete,
upgrade,
created_file_params,
@ -409,17 +416,34 @@ async fn upload_files_to_object_store(
.collect()
}
async fn fetch_and_save_parquet_file_state(
components: &Components,
partition_id: PartitionId,
) -> SavedParquetFileState {
let catalog_files = components.partition_files_source.fetch(partition_id).await;
SavedParquetFileState::from(&catalog_files)
}
/// Update the catalog to create, soft delete and upgrade corresponding given input
/// to provided target level
/// Return created and upgraded files
async fn update_catalog(
components: Arc<Components>,
partition_id: PartitionId,
saved_parquet_file_state: SavedParquetFileState,
files_to_delete: Vec<ParquetFile>,
files_to_upgrade: Vec<ParquetFile>,
file_params_to_create: Vec<ParquetFileParams>,
target_level: CompactionLevel,
) -> (Vec<ParquetFile>, Vec<ParquetFile>) {
let current_parquet_file_state =
fetch_and_save_parquet_file_state(&components, partition_id).await;
// Right now this only logs; in the future we might decide not to commit these changes
let _ignore = components
.changed_files_filter
.apply(&saved_parquet_file_state, &current_parquet_file_state);
let created_ids = components
.commit
.commit(