From 956d7bcee488bf057ea041d3a2a566f9fef71dc9 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 30 Mar 2023 12:39:03 -0400 Subject: [PATCH 1/5] revert: "revert: Merge pull request #7369 from influxdata/cn/parquet-file-saved-status" This reverts commit 0d7393f2c1133413af862cf4f70603eba7b82a46. --- compactor2/src/compactor.rs | 7 +- compactor2/src/components/hardcoded.rs | 5 +- compactor2/src/driver.rs | 39 ++++++- compactor2/src/error.rs | 8 ++ compactor2/src/partition_info.rs | 151 ++++++++++++++++++++++++- 5 files changed, 203 insertions(+), 7 deletions(-) diff --git a/compactor2/src/compactor.rs b/compactor2/src/compactor.rs index 87a3642676..b57635e109 100644 --- a/compactor2/src/compactor.rs +++ b/compactor2/src/compactor.rs @@ -56,7 +56,12 @@ impl Compactor2 { tokio::select! { _ = shutdown_captured.cancelled() => {} _ = async { - compact(config.partition_concurrency, config.partition_timeout, Arc::clone(&job_semaphore), &components).await; + compact( + config.partition_concurrency, + config.partition_timeout, + Arc::clone(&job_semaphore), + &components + ).await; info!("compactor done"); } => {} diff --git a/compactor2/src/components/hardcoded.rs b/compactor2/src/components/hardcoded.rs index 36cc60e620..2d1020780e 100644 --- a/compactor2/src/components/hardcoded.rs +++ b/compactor2/src/components/hardcoded.rs @@ -191,7 +191,10 @@ pub fn hardcoded_components(config: &Config) -> Arc { .filter(|kind| { // use explicit match statement so we never forget to add new variants match kind { - ErrorKind::OutOfMemory | ErrorKind::Timeout | ErrorKind::Unknown => true, + ErrorKind::OutOfMemory + | ErrorKind::Timeout + | ErrorKind::ConcurrentModification + | ErrorKind::Unknown => true, ErrorKind::ObjectStore => false, } }) diff --git a/compactor2/src/driver.rs b/compactor2/src/driver.rs index 831f0b6324..8ea00f9242 100644 --- a/compactor2/src/driver.rs +++ b/compactor2/src/driver.rs @@ -15,7 +15,7 @@ use crate::{ }, error::{DynError, ErrorKind, SimpleError}, file_classification::{FileClassification, FilesForProgress, FilesToSplitOrCompact}, - partition_info::PartitionInfo, + partition_info::{PartitionInfo, SavedParquetFileState}, PlanIR, }; @@ -219,6 +219,12 @@ async fn try_compact_partition( let mut files_next = files_later; // loop for each "Branch" for branch in branches { + // Keep the current state as a check to make sure this is the only compactor modifying this partition's + // files. Check that the catalog state matches this before committing and, if it doesn't match, throw away + // the compaction work we've done. + let saved_parquet_file_state = + fetch_and_save_parquet_file_state(&components, partition_id).await; + let input_paths: Vec = branch.iter().map(ParquetFilePath::from).collect(); @@ -276,12 +282,13 @@ async fn try_compact_partition( let (created_files, upgraded_files) = update_catalog( Arc::clone(&components), partition_id, + saved_parquet_file_state, files_to_delete, upgrade, created_file_params, target_level, ) - .await; + .await?; // Extend created files, upgraded files and files_to_keep to files_next files_next.extend(created_files); @@ -409,17 +416,41 @@ async fn upload_files_to_object_store( .collect() } +async fn fetch_and_save_parquet_file_state( + components: &Components, + partition_id: PartitionId, +) -> SavedParquetFileState { + let catalog_files = components.partition_files_source.fetch(partition_id).await; + SavedParquetFileState::from(&catalog_files) +} + /// Update the catalog to create, soft delete and upgrade corresponding given input /// to provided target level /// Return created and upgraded files async fn update_catalog( components: Arc, partition_id: PartitionId, + saved_parquet_file_state: SavedParquetFileState, files_to_delete: Vec, files_to_upgrade: Vec, file_params_to_create: Vec, target_level: CompactionLevel, -) -> (Vec, Vec) { +) -> Result<(Vec, Vec), DynError> { + let current_parquet_file_state = + fetch_and_save_parquet_file_state(&components, partition_id).await; + + if saved_parquet_file_state != current_parquet_file_state { + // Someone else has changed the files in the catalog since we started compacting; throw away our work and + // don't commit anything. + return Err(Box::new(SimpleError::new( + ErrorKind::ConcurrentModification, + format!( + "Parquet files for partition {partition_id} have been modified since compaction started. \ + Saved: {saved_parquet_file_state:?} != Current: {current_parquet_file_state:?}" + ), + ))); + } + let created_ids = components .commit .commit( @@ -447,5 +478,5 @@ async fn update_catalog( }) .collect::>(); - (created_file_params, upgraded_files) + Ok((created_file_params, upgraded_files)) } diff --git a/compactor2/src/error.rs b/compactor2/src/error.rs index 3ae7be1b2b..aed1982a35 100644 --- a/compactor2/src/error.rs +++ b/compactor2/src/error.rs @@ -23,6 +23,12 @@ pub enum ErrorKind { /// Partition took too long. Timeout, + /// Concurrent modification. + /// + /// This compactor instance expected to be the only process working on this partition's Parquet files, but the + /// Parquet files were modified while the compactor was doing work, so the work was thrown away and not committed. + ConcurrentModification, + /// Unknown/unexpected error. /// /// This will likely mark the affected partition as "skipped" and the compactor will no longer touch it. @@ -36,6 +42,7 @@ impl ErrorKind { Self::ObjectStore, Self::OutOfMemory, Self::Timeout, + Self::ConcurrentModification, Self::Unknown, ] } @@ -46,6 +53,7 @@ impl ErrorKind { Self::ObjectStore => "object_store", Self::OutOfMemory => "out_of_memory", Self::Timeout => "timeout", + Self::ConcurrentModification => "concurrent_modification", Self::Unknown => "unknown", } } diff --git a/compactor2/src/partition_info.rs b/compactor2/src/partition_info.rs index ada8b2da31..54835d0fe2 100644 --- a/compactor2/src/partition_info.rs +++ b/compactor2/src/partition_info.rs @@ -2,7 +2,10 @@ use std::sync::Arc; -use data_types::{NamespaceId, PartitionId, PartitionKey, Table, TableSchema}; +use data_types::{ + CompactionLevel, NamespaceId, ParquetFile, ParquetFileId, PartitionId, PartitionKey, Table, + TableSchema, +}; use schema::sort::SortKey; /// Information about the Partition being compacted @@ -36,3 +39,149 @@ impl PartitionInfo { self.table_schema.column_count() } } + +/// Saved snapshot of a partition's Parquet files' IDs and compaction levels. Save this state at the beginning of a +/// compaction operation, then just before committing ask for the state again. If the two saved states are identical, +/// we assume no other compactor instance has compacted this partition and this compactor instance should commit its +/// work. If the two saved states differ, throw away the work and do not commit as the Parquet files have been changed +/// by some other process while this compactor instance was working. +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct SavedParquetFileState { + ids_and_levels: Vec<(ParquetFileId, CompactionLevel)>, +} + +impl<'a, T> From for SavedParquetFileState +where + T: IntoIterator, +{ + fn from(parquet_files: T) -> Self { + let mut ids_and_levels: Vec<_> = parquet_files + .into_iter() + .map(|pf| (pf.id, pf.compaction_level)) + .collect(); + + ids_and_levels.sort(); + + Self { ids_and_levels } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use iox_tests::ParquetFileBuilder; + + #[test] + fn saved_state_sorts_by_parquet_file_id() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id2_level_2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + let pf_id3_level_1 = ParquetFileBuilder::new(3) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + + let saved_state_1 = + SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2, &pf_id3_level_1]); + let saved_state_2 = + SavedParquetFileState::from([&pf_id3_level_1, &pf_id1_level_0, &pf_id2_level_2]); + + assert_eq!(saved_state_1, saved_state_2); + } + + #[test] + fn both_empty_parquet_files() { + let saved_state_1 = SavedParquetFileState::from([]); + let saved_state_2 = SavedParquetFileState::from([]); + + assert_eq!(saved_state_1, saved_state_2); + } + + #[test] + fn one_empty_parquet_files() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + + let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0]); + let saved_state_2 = SavedParquetFileState::from([]); + + assert_ne!(saved_state_1, saved_state_2); + } + + #[test] + fn missing_files_not_equal() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id2_level_2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + let pf_id3_level_1 = ParquetFileBuilder::new(3) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + + let saved_state_1 = + SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2, &pf_id3_level_1]); + let saved_state_2 = SavedParquetFileState::from([&pf_id3_level_1, &pf_id1_level_0]); + + assert_ne!(saved_state_1, saved_state_2); + } + + #[test] + fn additional_files_not_equal() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id2_level_2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + let pf_id3_level_1 = ParquetFileBuilder::new(3) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + + let saved_state_1 = SavedParquetFileState::from([&pf_id3_level_1, &pf_id1_level_0]); + let saved_state_2 = + SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2, &pf_id3_level_1]); + + assert_ne!(saved_state_1, saved_state_2); + } + + #[test] + fn changed_compaction_level_not_equal() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id1_level_1 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + let pf_id2_level_2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + + let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2]); + let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_1, &pf_id2_level_2]); + + assert_ne!(saved_state_1, saved_state_2); + } + + #[test] + fn same_number_of_files_different_ids_not_equal() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id2_level_0 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id3_level_2 = ParquetFileBuilder::new(3) + .with_compaction_level(CompactionLevel::Final) + .build(); + + let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id3_level_2]); + let saved_state_2 = SavedParquetFileState::from([&pf_id2_level_0, &pf_id3_level_2]); + + assert_ne!(saved_state_1, saved_state_2); + } +} From 48b102f037b43bf060beb148f2697fce0b57bbc4 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 30 Mar 2023 13:55:02 -0400 Subject: [PATCH 2/5] fix: Only check that existing files continue to exist at their current compaction level --- compactor2/src/driver.rs | 11 ++--- compactor2/src/partition_info.rs | 80 ++++++++++++++++++-------------- 2 files changed, 49 insertions(+), 42 deletions(-) diff --git a/compactor2/src/driver.rs b/compactor2/src/driver.rs index 8ea00f9242..fdef2e2b11 100644 --- a/compactor2/src/driver.rs +++ b/compactor2/src/driver.rs @@ -219,11 +219,10 @@ async fn try_compact_partition( let mut files_next = files_later; // loop for each "Branch" for branch in branches { - // Keep the current state as a check to make sure this is the only compactor modifying this partition's - // files. Check that the catalog state matches this before committing and, if it doesn't match, throw away - // the compaction work we've done. - let saved_parquet_file_state = - fetch_and_save_parquet_file_state(&components, partition_id).await; + // Keep the current state as a check to make sure this is the only compactor modifying this branch's + // files. Check that the catalog state for the files in this set is the same before committing and, if not, + // throw away the compaction work we've done. + let saved_parquet_file_state = SavedParquetFileState::from(&branch); let input_paths: Vec = branch.iter().map(ParquetFilePath::from).collect(); @@ -439,7 +438,7 @@ async fn update_catalog( let current_parquet_file_state = fetch_and_save_parquet_file_state(&components, partition_id).await; - if saved_parquet_file_state != current_parquet_file_state { + if saved_parquet_file_state.existing_files_modified(¤t_parquet_file_state) { // Someone else has changed the files in the catalog since we started compacting; throw away our work and // don't commit anything. return Err(Box::new(SimpleError::new( diff --git a/compactor2/src/partition_info.rs b/compactor2/src/partition_info.rs index 54835d0fe2..4900ea21b0 100644 --- a/compactor2/src/partition_info.rs +++ b/compactor2/src/partition_info.rs @@ -1,6 +1,6 @@ //! Information of a partition for compaction -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use data_types::{ CompactionLevel, NamespaceId, ParquetFile, ParquetFileId, PartitionId, PartitionKey, Table, @@ -45,9 +45,9 @@ impl PartitionInfo { /// we assume no other compactor instance has compacted this partition and this compactor instance should commit its /// work. If the two saved states differ, throw away the work and do not commit as the Parquet files have been changed /// by some other process while this compactor instance was working. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone)] pub(crate) struct SavedParquetFileState { - ids_and_levels: Vec<(ParquetFileId, CompactionLevel)>, + ids_and_levels: HashSet<(ParquetFileId, CompactionLevel)>, } impl<'a, T> From for SavedParquetFileState @@ -55,17 +55,26 @@ where T: IntoIterator, { fn from(parquet_files: T) -> Self { - let mut ids_and_levels: Vec<_> = parquet_files + let ids_and_levels = parquet_files .into_iter() .map(|pf| (pf.id, pf.compaction_level)) .collect(); - ids_and_levels.sort(); - Self { ids_and_levels } } } +impl SavedParquetFileState { + pub fn existing_files_modified(&self, new: &SavedParquetFileState) -> bool { + let old = self; + let mut missing = old.ids_and_levels.difference(&new.ids_and_levels); + + // If there are any files in `self`/`old` that are not present in `new`, that means some files were marked + // to delete by some other process. + missing.next().is_some() + } +} + #[cfg(test)] mod tests { use super::*; @@ -88,7 +97,7 @@ mod tests { let saved_state_2 = SavedParquetFileState::from([&pf_id3_level_1, &pf_id1_level_0, &pf_id2_level_2]); - assert_eq!(saved_state_1, saved_state_2); + assert!(!saved_state_1.existing_files_modified(&saved_state_2)); } #[test] @@ -96,11 +105,11 @@ mod tests { let saved_state_1 = SavedParquetFileState::from([]); let saved_state_2 = SavedParquetFileState::from([]); - assert_eq!(saved_state_1, saved_state_2); + assert!(!saved_state_1.existing_files_modified(&saved_state_2)); } #[test] - fn one_empty_parquet_files() { + fn missing_files_indicates_modifications() { let pf_id1_level_0 = ParquetFileBuilder::new(1) .with_compaction_level(CompactionLevel::Initial) .build(); @@ -108,49 +117,48 @@ mod tests { let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0]); let saved_state_2 = SavedParquetFileState::from([]); - assert_ne!(saved_state_1, saved_state_2); + assert!(saved_state_1.existing_files_modified(&saved_state_2)); } #[test] - fn missing_files_not_equal() { + fn disregard_new_files() { let pf_id1_level_0 = ParquetFileBuilder::new(1) .with_compaction_level(CompactionLevel::Initial) .build(); + + // New files of any level don't affect whether the old saved state is considered modified let pf_id2_level_2 = ParquetFileBuilder::new(2) .with_compaction_level(CompactionLevel::Final) .build(); let pf_id3_level_1 = ParquetFileBuilder::new(3) .with_compaction_level(CompactionLevel::FileNonOverlapped) .build(); - - let saved_state_1 = - SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2, &pf_id3_level_1]); - let saved_state_2 = SavedParquetFileState::from([&pf_id3_level_1, &pf_id1_level_0]); - - assert_ne!(saved_state_1, saved_state_2); - } - - #[test] - fn additional_files_not_equal() { - let pf_id1_level_0 = ParquetFileBuilder::new(1) + let pf_id4_level_0 = ParquetFileBuilder::new(1) .with_compaction_level(CompactionLevel::Initial) .build(); - let pf_id2_level_2 = ParquetFileBuilder::new(2) - .with_compaction_level(CompactionLevel::Final) - .build(); - let pf_id3_level_1 = ParquetFileBuilder::new(3) - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .build(); - let saved_state_1 = SavedParquetFileState::from([&pf_id3_level_1, &pf_id1_level_0]); - let saved_state_2 = - SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2, &pf_id3_level_1]); + let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0]); - assert_ne!(saved_state_1, saved_state_2); + let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2]); + assert!(!saved_state_1.existing_files_modified(&saved_state_2)); + + let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id3_level_1]); + assert!(!saved_state_1.existing_files_modified(&saved_state_2)); + + let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id4_level_0]); + assert!(!saved_state_1.existing_files_modified(&saved_state_2)); + + let saved_state_2 = SavedParquetFileState::from([ + &pf_id1_level_0, + &pf_id2_level_2, + &pf_id4_level_0, + &pf_id4_level_0, + ]); + assert!(!saved_state_1.existing_files_modified(&saved_state_2)); } #[test] - fn changed_compaction_level_not_equal() { + fn changed_compaction_level_indicates_modification() { let pf_id1_level_0 = ParquetFileBuilder::new(1) .with_compaction_level(CompactionLevel::Initial) .build(); @@ -164,11 +172,11 @@ mod tests { let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2]); let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_1, &pf_id2_level_2]); - assert_ne!(saved_state_1, saved_state_2); + assert!(saved_state_1.existing_files_modified(&saved_state_2)); } #[test] - fn same_number_of_files_different_ids_not_equal() { + fn same_number_of_files_different_ids_indicates_modification() { let pf_id1_level_0 = ParquetFileBuilder::new(1) .with_compaction_level(CompactionLevel::Initial) .build(); @@ -182,6 +190,6 @@ mod tests { let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id3_level_2]); let saved_state_2 = SavedParquetFileState::from([&pf_id2_level_0, &pf_id3_level_2]); - assert_ne!(saved_state_1, saved_state_2); + assert!(saved_state_1.existing_files_modified(&saved_state_2)); } } From bf026d1f74ff3b7aa8beb3a551339957f946146e Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 30 Mar 2023 15:03:37 -0400 Subject: [PATCH 3/5] fix: Only log that we've detected changed files we're about to compact --- .../changed_files_filter/logging.rs | 33 +++ .../components/changed_files_filter/mod.rs | 213 ++++++++++++++++++ compactor2/src/components/hardcoded.rs | 7 +- compactor2/src/components/mod.rs | 14 +- compactor2/src/components/report.rs | 2 + compactor2/src/driver.rs | 24 +- compactor2/src/error.rs | 8 - compactor2/src/partition_info.rs | 161 +------------ 8 files changed, 271 insertions(+), 191 deletions(-) create mode 100644 compactor2/src/components/changed_files_filter/logging.rs create mode 100644 compactor2/src/components/changed_files_filter/mod.rs diff --git a/compactor2/src/components/changed_files_filter/logging.rs b/compactor2/src/components/changed_files_filter/logging.rs new file mode 100644 index 0000000000..233e834362 --- /dev/null +++ b/compactor2/src/components/changed_files_filter/logging.rs @@ -0,0 +1,33 @@ +use std::fmt::Display; + +use super::{ChangedFilesFilter, SavedParquetFileState}; + +use async_trait::async_trait; +use observability_deps::tracing::info; + +#[derive(Debug, Default, Copy, Clone)] +pub struct LoggingChangedFiles {} + +impl LoggingChangedFiles { + pub fn new() -> Self { + Self {} + } +} + +impl Display for LoggingChangedFiles { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "logging_changed_files") + } +} + +#[async_trait] +impl ChangedFilesFilter for LoggingChangedFiles { + async fn apply(&self, old: &SavedParquetFileState, new: &SavedParquetFileState) -> bool { + if old.existing_files_modified(new) { + let modified_ids_and_levels = old.modified_ids_and_levels(new); + info!(?modified_ids_and_levels, "Concurrent modification detected"); + } + + false // we're ignoring the return value anyway for the moment + } +} diff --git a/compactor2/src/components/changed_files_filter/mod.rs b/compactor2/src/components/changed_files_filter/mod.rs new file mode 100644 index 0000000000..4cb1822ae9 --- /dev/null +++ b/compactor2/src/components/changed_files_filter/mod.rs @@ -0,0 +1,213 @@ +use std::{ + collections::HashSet, + fmt::{Debug, Display}, +}; + +use async_trait::async_trait; +use data_types::{CompactionLevel, ParquetFile, ParquetFileId}; + +pub mod logging; + +/// Filters partition based on ID and Parquet files after the files have been classified. +/// +/// May return an error. In this case, the partition will be marked as "skipped". +#[async_trait] +pub trait ChangedFilesFilter: Debug + Display + Send + Sync { + /// Return `true` if some other process modified the files in `old` such that they don't appear or appear with a + /// different compaction level than `new`, and thus we should stop compacting. + async fn apply(&self, old: &SavedParquetFileState, new: &SavedParquetFileState) -> bool; +} + +/// Saved snapshot of a partition's Parquet files' IDs and compaction levels. Save this state at the beginning of a +/// compaction operation, then just before committing ask for the state again. If the two saved states are identical, +/// we assume no other compactor instance has compacted this partition and this compactor instance should commit its +/// work. If the two saved states differ, throw away the work and do not commit as the Parquet files have been changed +/// by some other process while this compactor instance was working. +#[derive(Debug, Clone)] +pub struct SavedParquetFileState { + ids_and_levels: HashSet<(ParquetFileId, CompactionLevel)>, +} + +impl<'a, T> From for SavedParquetFileState +where + T: IntoIterator, +{ + fn from(parquet_files: T) -> Self { + let ids_and_levels = parquet_files + .into_iter() + .map(|pf| (pf.id, pf.compaction_level)) + .collect(); + + Self { ids_and_levels } + } +} + +impl SavedParquetFileState { + fn missing<'a>( + &'a self, + new: &'a Self, + ) -> impl Iterator { + let old = self; + old.ids_and_levels.difference(&new.ids_and_levels) + } + + pub fn existing_files_modified(&self, new: &Self) -> bool { + let mut missing = self.missing(new); + // If there are any files in `self`/`old` that are not present in `new`, that means some files were marked + // to delete by some other process. + missing.next().is_some() + } + + pub fn modified_ids_and_levels(&self, new: &Self) -> Vec<(ParquetFileId, CompactionLevel)> { + self.missing(new).cloned().collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use iox_tests::ParquetFileBuilder; + + #[test] + fn saved_state_sorts_by_parquet_file_id() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id2_level_2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + let pf_id3_level_1 = ParquetFileBuilder::new(3) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + + let saved_state_1 = + SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2, &pf_id3_level_1]); + let saved_state_2 = + SavedParquetFileState::from([&pf_id3_level_1, &pf_id1_level_0, &pf_id2_level_2]); + + assert!(!saved_state_1.existing_files_modified(&saved_state_2)); + assert!(saved_state_1 + .modified_ids_and_levels(&saved_state_2) + .is_empty()); + } + + #[test] + fn both_empty_parquet_files() { + let saved_state_1 = SavedParquetFileState::from([]); + let saved_state_2 = SavedParquetFileState::from([]); + + assert!(!saved_state_1.existing_files_modified(&saved_state_2)); + assert!(saved_state_1 + .modified_ids_and_levels(&saved_state_2) + .is_empty()); + } + + #[test] + fn missing_files_indicates_modifications() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + + let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0]); + let saved_state_2 = SavedParquetFileState::from([]); + + assert!(saved_state_1.existing_files_modified(&saved_state_2)); + assert_eq!( + saved_state_1.modified_ids_and_levels(&saved_state_2), + &[(ParquetFileId::new(1), CompactionLevel::Initial)] + ); + } + + #[test] + fn disregard_new_files() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + + // New files of any level don't affect whether the old saved state is considered modified + let pf_id2_level_2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + let pf_id3_level_1 = ParquetFileBuilder::new(3) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + let pf_id4_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + + let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0]); + + let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2]); + assert!(!saved_state_1.existing_files_modified(&saved_state_2)); + assert!(saved_state_1 + .modified_ids_and_levels(&saved_state_2) + .is_empty()); + + let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id3_level_1]); + assert!(!saved_state_1.existing_files_modified(&saved_state_2)); + assert!(saved_state_1 + .modified_ids_and_levels(&saved_state_2) + .is_empty()); + + let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id4_level_0]); + assert!(!saved_state_1.existing_files_modified(&saved_state_2)); + assert!(saved_state_1 + .modified_ids_and_levels(&saved_state_2) + .is_empty()); + + let saved_state_2 = SavedParquetFileState::from([ + &pf_id1_level_0, + &pf_id2_level_2, + &pf_id4_level_0, + &pf_id4_level_0, + ]); + assert!(!saved_state_1.existing_files_modified(&saved_state_2)); + assert!(saved_state_1 + .modified_ids_and_levels(&saved_state_2) + .is_empty()); + } + + #[test] + fn changed_compaction_level_indicates_modification() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id1_level_1 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + let pf_id2_level_2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + + let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2]); + let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_1, &pf_id2_level_2]); + + assert!(saved_state_1.existing_files_modified(&saved_state_2)); + assert_eq!( + saved_state_1.modified_ids_and_levels(&saved_state_2), + &[(ParquetFileId::new(1), CompactionLevel::Initial)] + ); + } + + #[test] + fn same_number_of_files_different_ids_indicates_modification() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id2_level_0 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id3_level_2 = ParquetFileBuilder::new(3) + .with_compaction_level(CompactionLevel::Final) + .build(); + + let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id3_level_2]); + let saved_state_2 = SavedParquetFileState::from([&pf_id2_level_0, &pf_id3_level_2]); + + assert!(saved_state_1.existing_files_modified(&saved_state_2)); + assert_eq!( + saved_state_1.modified_ids_and_levels(&saved_state_2), + &[(ParquetFileId::new(1), CompactionLevel::Initial)] + ); + } +} diff --git a/compactor2/src/components/hardcoded.rs b/compactor2/src/components/hardcoded.rs index 2d1020780e..1e5ed70284 100644 --- a/compactor2/src/components/hardcoded.rs +++ b/compactor2/src/components/hardcoded.rs @@ -18,6 +18,7 @@ use crate::{ }; use super::{ + changed_files_filter::logging::LoggingChangedFiles, combos::{throttle_partition::throttle_partition, unique_partitions::unique_partitions}, commit::{ catalog::CatalogCommit, logging::LoggingCommitWrapper, metrics::MetricsCommitWrapper, @@ -191,10 +192,7 @@ pub fn hardcoded_components(config: &Config) -> Arc { .filter(|kind| { // use explicit match statement so we never forget to add new variants match kind { - ErrorKind::OutOfMemory - | ErrorKind::Timeout - | ErrorKind::ConcurrentModification - | ErrorKind::Unknown => true, + ErrorKind::OutOfMemory | ErrorKind::Timeout | ErrorKind::Unknown => true, ErrorKind::ObjectStore => false, } }) @@ -341,6 +339,7 @@ pub fn hardcoded_components(config: &Config) -> Arc { partition_resource_limit_conditions, ), ), + changed_files_filter: Arc::new(LoggingChangedFiles::new()), }) } diff --git a/compactor2/src/components/mod.rs b/compactor2/src/components/mod.rs index 963ee5f347..6b7fbd3651 100644 --- a/compactor2/src/components/mod.rs +++ b/compactor2/src/components/mod.rs @@ -1,15 +1,17 @@ use std::sync::Arc; use self::{ - commit::Commit, df_plan_exec::DataFusionPlanExec, df_planner::DataFusionPlanner, - divide_initial::DivideInitial, file_classifier::FileClassifier, ir_planner::IRPlanner, - parquet_files_sink::ParquetFilesSink, partition_done_sink::PartitionDoneSink, - partition_files_source::PartitionFilesSource, partition_filter::PartitionFilter, - partition_info_source::PartitionInfoSource, partition_stream::PartitionStream, + changed_files_filter::ChangedFilesFilter, commit::Commit, df_plan_exec::DataFusionPlanExec, + df_planner::DataFusionPlanner, divide_initial::DivideInitial, file_classifier::FileClassifier, + ir_planner::IRPlanner, parquet_files_sink::ParquetFilesSink, + partition_done_sink::PartitionDoneSink, partition_files_source::PartitionFilesSource, + partition_filter::PartitionFilter, partition_info_source::PartitionInfoSource, + partition_stream::PartitionStream, post_classification_partition_filter::PostClassificationPartitionFilter, round_info_source::RoundInfoSource, round_split::RoundSplit, scratchpad::ScratchpadGen, }; +pub mod changed_files_filter; pub mod combos; pub mod commit; pub mod df_plan_exec; @@ -78,4 +80,6 @@ pub struct Components { pub scratchpad_gen: Arc, /// Classify files for each compaction branch. pub file_classifier: Arc, + /// Check for other processes modifying files. + pub changed_files_filter: Arc, } diff --git a/compactor2/src/components/report.rs b/compactor2/src/components/report.rs index 5e9f44c690..a9fcdcbfe3 100644 --- a/compactor2/src/components/report.rs +++ b/compactor2/src/components/report.rs @@ -110,6 +110,7 @@ pub fn log_components(components: &Components) { divide_initial, scratchpad_gen, file_classifier, + changed_files_filter, } = components; info!( @@ -129,6 +130,7 @@ pub fn log_components(components: &Components) { %divide_initial, %scratchpad_gen, %file_classifier, + %changed_files_filter, "component setup", ); } diff --git a/compactor2/src/driver.rs b/compactor2/src/driver.rs index fdef2e2b11..2fc920d13b 100644 --- a/compactor2/src/driver.rs +++ b/compactor2/src/driver.rs @@ -9,13 +9,14 @@ use tracker::InstrumentedAsyncSemaphore; use crate::{ components::{ + changed_files_filter::SavedParquetFileState, scratchpad::Scratchpad, timeout::{timeout_with_progress_checking, TimeoutWithProgress}, Components, }, error::{DynError, ErrorKind, SimpleError}, file_classification::{FileClassification, FilesForProgress, FilesToSplitOrCompact}, - partition_info::{PartitionInfo, SavedParquetFileState}, + partition_info::PartitionInfo, PlanIR, }; @@ -287,7 +288,7 @@ async fn try_compact_partition( created_file_params, target_level, ) - .await?; + .await; // Extend created files, upgraded files and files_to_keep to files_next files_next.extend(created_files); @@ -434,21 +435,14 @@ async fn update_catalog( files_to_upgrade: Vec, file_params_to_create: Vec, target_level: CompactionLevel, -) -> Result<(Vec, Vec), DynError> { +) -> (Vec, Vec) { let current_parquet_file_state = fetch_and_save_parquet_file_state(&components, partition_id).await; - if saved_parquet_file_state.existing_files_modified(¤t_parquet_file_state) { - // Someone else has changed the files in the catalog since we started compacting; throw away our work and - // don't commit anything. - return Err(Box::new(SimpleError::new( - ErrorKind::ConcurrentModification, - format!( - "Parquet files for partition {partition_id} have been modified since compaction started. \ - Saved: {saved_parquet_file_state:?} != Current: {current_parquet_file_state:?}" - ), - ))); - } + // Right now this only logs; in the future we might decide not to commit these changes + let _ignore = components + .changed_files_filter + .apply(&saved_parquet_file_state, ¤t_parquet_file_state); let created_ids = components .commit @@ -477,5 +471,5 @@ async fn update_catalog( }) .collect::>(); - Ok((created_file_params, upgraded_files)) + (created_file_params, upgraded_files) } diff --git a/compactor2/src/error.rs b/compactor2/src/error.rs index aed1982a35..3ae7be1b2b 100644 --- a/compactor2/src/error.rs +++ b/compactor2/src/error.rs @@ -23,12 +23,6 @@ pub enum ErrorKind { /// Partition took too long. Timeout, - /// Concurrent modification. - /// - /// This compactor instance expected to be the only process working on this partition's Parquet files, but the - /// Parquet files were modified while the compactor was doing work, so the work was thrown away and not committed. - ConcurrentModification, - /// Unknown/unexpected error. /// /// This will likely mark the affected partition as "skipped" and the compactor will no longer touch it. @@ -42,7 +36,6 @@ impl ErrorKind { Self::ObjectStore, Self::OutOfMemory, Self::Timeout, - Self::ConcurrentModification, Self::Unknown, ] } @@ -53,7 +46,6 @@ impl ErrorKind { Self::ObjectStore => "object_store", Self::OutOfMemory => "out_of_memory", Self::Timeout => "timeout", - Self::ConcurrentModification => "concurrent_modification", Self::Unknown => "unknown", } } diff --git a/compactor2/src/partition_info.rs b/compactor2/src/partition_info.rs index 4900ea21b0..ada8b2da31 100644 --- a/compactor2/src/partition_info.rs +++ b/compactor2/src/partition_info.rs @@ -1,11 +1,8 @@ //! Information of a partition for compaction -use std::{collections::HashSet, sync::Arc}; +use std::sync::Arc; -use data_types::{ - CompactionLevel, NamespaceId, ParquetFile, ParquetFileId, PartitionId, PartitionKey, Table, - TableSchema, -}; +use data_types::{NamespaceId, PartitionId, PartitionKey, Table, TableSchema}; use schema::sort::SortKey; /// Information about the Partition being compacted @@ -39,157 +36,3 @@ impl PartitionInfo { self.table_schema.column_count() } } - -/// Saved snapshot of a partition's Parquet files' IDs and compaction levels. Save this state at the beginning of a -/// compaction operation, then just before committing ask for the state again. If the two saved states are identical, -/// we assume no other compactor instance has compacted this partition and this compactor instance should commit its -/// work. If the two saved states differ, throw away the work and do not commit as the Parquet files have been changed -/// by some other process while this compactor instance was working. -#[derive(Debug, Clone)] -pub(crate) struct SavedParquetFileState { - ids_and_levels: HashSet<(ParquetFileId, CompactionLevel)>, -} - -impl<'a, T> From for SavedParquetFileState -where - T: IntoIterator, -{ - fn from(parquet_files: T) -> Self { - let ids_and_levels = parquet_files - .into_iter() - .map(|pf| (pf.id, pf.compaction_level)) - .collect(); - - Self { ids_and_levels } - } -} - -impl SavedParquetFileState { - pub fn existing_files_modified(&self, new: &SavedParquetFileState) -> bool { - let old = self; - let mut missing = old.ids_and_levels.difference(&new.ids_and_levels); - - // If there are any files in `self`/`old` that are not present in `new`, that means some files were marked - // to delete by some other process. - missing.next().is_some() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use iox_tests::ParquetFileBuilder; - - #[test] - fn saved_state_sorts_by_parquet_file_id() { - let pf_id1_level_0 = ParquetFileBuilder::new(1) - .with_compaction_level(CompactionLevel::Initial) - .build(); - let pf_id2_level_2 = ParquetFileBuilder::new(2) - .with_compaction_level(CompactionLevel::Final) - .build(); - let pf_id3_level_1 = ParquetFileBuilder::new(3) - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .build(); - - let saved_state_1 = - SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2, &pf_id3_level_1]); - let saved_state_2 = - SavedParquetFileState::from([&pf_id3_level_1, &pf_id1_level_0, &pf_id2_level_2]); - - assert!(!saved_state_1.existing_files_modified(&saved_state_2)); - } - - #[test] - fn both_empty_parquet_files() { - let saved_state_1 = SavedParquetFileState::from([]); - let saved_state_2 = SavedParquetFileState::from([]); - - assert!(!saved_state_1.existing_files_modified(&saved_state_2)); - } - - #[test] - fn missing_files_indicates_modifications() { - let pf_id1_level_0 = ParquetFileBuilder::new(1) - .with_compaction_level(CompactionLevel::Initial) - .build(); - - let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0]); - let saved_state_2 = SavedParquetFileState::from([]); - - assert!(saved_state_1.existing_files_modified(&saved_state_2)); - } - - #[test] - fn disregard_new_files() { - let pf_id1_level_0 = ParquetFileBuilder::new(1) - .with_compaction_level(CompactionLevel::Initial) - .build(); - - // New files of any level don't affect whether the old saved state is considered modified - let pf_id2_level_2 = ParquetFileBuilder::new(2) - .with_compaction_level(CompactionLevel::Final) - .build(); - let pf_id3_level_1 = ParquetFileBuilder::new(3) - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .build(); - let pf_id4_level_0 = ParquetFileBuilder::new(1) - .with_compaction_level(CompactionLevel::Initial) - .build(); - - let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0]); - - let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2]); - assert!(!saved_state_1.existing_files_modified(&saved_state_2)); - - let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id3_level_1]); - assert!(!saved_state_1.existing_files_modified(&saved_state_2)); - - let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id4_level_0]); - assert!(!saved_state_1.existing_files_modified(&saved_state_2)); - - let saved_state_2 = SavedParquetFileState::from([ - &pf_id1_level_0, - &pf_id2_level_2, - &pf_id4_level_0, - &pf_id4_level_0, - ]); - assert!(!saved_state_1.existing_files_modified(&saved_state_2)); - } - - #[test] - fn changed_compaction_level_indicates_modification() { - let pf_id1_level_0 = ParquetFileBuilder::new(1) - .with_compaction_level(CompactionLevel::Initial) - .build(); - let pf_id1_level_1 = ParquetFileBuilder::new(1) - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .build(); - let pf_id2_level_2 = ParquetFileBuilder::new(2) - .with_compaction_level(CompactionLevel::Final) - .build(); - - let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2]); - let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_1, &pf_id2_level_2]); - - assert!(saved_state_1.existing_files_modified(&saved_state_2)); - } - - #[test] - fn same_number_of_files_different_ids_indicates_modification() { - let pf_id1_level_0 = ParquetFileBuilder::new(1) - .with_compaction_level(CompactionLevel::Initial) - .build(); - let pf_id2_level_0 = ParquetFileBuilder::new(2) - .with_compaction_level(CompactionLevel::Initial) - .build(); - let pf_id3_level_2 = ParquetFileBuilder::new(3) - .with_compaction_level(CompactionLevel::Final) - .build(); - - let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id3_level_2]); - let saved_state_2 = SavedParquetFileState::from([&pf_id2_level_0, &pf_id3_level_2]); - - assert!(saved_state_1.existing_files_modified(&saved_state_2)); - } -} From c37f908349165f03f79f99c832a37cf50225fd8b Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 30 Mar 2023 16:02:43 -0400 Subject: [PATCH 4/5] docs: Update comments based on the new criteria for changed files --- .../src/components/changed_files_filter/mod.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/compactor2/src/components/changed_files_filter/mod.rs b/compactor2/src/components/changed_files_filter/mod.rs index 4cb1822ae9..7804b1395c 100644 --- a/compactor2/src/components/changed_files_filter/mod.rs +++ b/compactor2/src/components/changed_files_filter/mod.rs @@ -8,9 +8,7 @@ use data_types::{CompactionLevel, ParquetFile, ParquetFileId}; pub mod logging; -/// Filters partition based on ID and Parquet files after the files have been classified. -/// -/// May return an error. In this case, the partition will be marked as "skipped". +/// Returns `true` if the files in the saved state have been changed according to the current state. #[async_trait] pub trait ChangedFilesFilter: Debug + Display + Send + Sync { /// Return `true` if some other process modified the files in `old` such that they don't appear or appear with a @@ -19,10 +17,12 @@ pub trait ChangedFilesFilter: Debug + Display + Send + Sync { } /// Saved snapshot of a partition's Parquet files' IDs and compaction levels. Save this state at the beginning of a -/// compaction operation, then just before committing ask for the state again. If the two saved states are identical, -/// we assume no other compactor instance has compacted this partition and this compactor instance should commit its -/// work. If the two saved states differ, throw away the work and do not commit as the Parquet files have been changed -/// by some other process while this compactor instance was working. +/// compaction operation, then just before committing ask for the catalog state again. If the ID+compaction level pairs +/// in the initial saved state still appear in the latest catalog state (disregarding any new files that may appear in +/// the latest catalog state) we assume no other compactor instance has compacted the relevant files and this compactor +/// instance should commit its work. If any old ID+compaction level pairs are missing from the latest catalog state +/// (and thus show up in a set difference operation of `old - current`), throw away the work and do not commit as the +/// relevant Parquet files have been changed by some other process while this compactor instance was working. #[derive(Debug, Clone)] pub struct SavedParquetFileState { ids_and_levels: HashSet<(ParquetFileId, CompactionLevel)>, @@ -53,8 +53,8 @@ impl SavedParquetFileState { pub fn existing_files_modified(&self, new: &Self) -> bool { let mut missing = self.missing(new); - // If there are any files in `self`/`old` that are not present in `new`, that means some files were marked - // to delete by some other process. + // If there are any `(ParquetFileId, CompactionLevel)` pairs in `self` that are not present in `new`, that + // means some files were marked to delete or had their compaction level changed by some other process. missing.next().is_some() } From 8718aaa148711cf329973c46e471ae3e818f4ab9 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 30 Mar 2023 16:02:56 -0400 Subject: [PATCH 5/5] fix: Change test file ID to match intent --- compactor2/src/components/changed_files_filter/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compactor2/src/components/changed_files_filter/mod.rs b/compactor2/src/components/changed_files_filter/mod.rs index 7804b1395c..03a33e8936 100644 --- a/compactor2/src/components/changed_files_filter/mod.rs +++ b/compactor2/src/components/changed_files_filter/mod.rs @@ -131,7 +131,7 @@ mod tests { let pf_id3_level_1 = ParquetFileBuilder::new(3) .with_compaction_level(CompactionLevel::FileNonOverlapped) .build(); - let pf_id4_level_0 = ParquetFileBuilder::new(1) + let pf_id4_level_0 = ParquetFileBuilder::new(4) .with_compaction_level(CompactionLevel::Initial) .build();