diff --git a/compactor2/src/components/hardcoded.rs b/compactor2/src/components/hardcoded.rs index 36cc60e620..2d1020780e 100644 --- a/compactor2/src/components/hardcoded.rs +++ b/compactor2/src/components/hardcoded.rs @@ -191,7 +191,10 @@ pub fn hardcoded_components(config: &Config) -> Arc { .filter(|kind| { // use explicit match statement so we never forget to add new variants match kind { - ErrorKind::OutOfMemory | ErrorKind::Timeout | ErrorKind::Unknown => true, + ErrorKind::OutOfMemory + | ErrorKind::Timeout + | ErrorKind::ConcurrentModification + | ErrorKind::Unknown => true, ErrorKind::ObjectStore => false, } }) diff --git a/compactor2/src/driver.rs b/compactor2/src/driver.rs index 831f0b6324..cf264cbb86 100644 --- a/compactor2/src/driver.rs +++ b/compactor2/src/driver.rs @@ -15,7 +15,7 @@ use crate::{ }, error::{DynError, ErrorKind, SimpleError}, file_classification::{FileClassification, FilesForProgress, FilesToSplitOrCompact}, - partition_info::PartitionInfo, + partition_info::{PartitionInfo, SavedParquetFileState}, PlanIR, }; @@ -219,6 +219,12 @@ async fn try_compact_partition( let mut files_next = files_later; // loop for each "Branch" for branch in branches { + // Keep the current state as a check to make sure this is the only compactor modifying this partition's + // files. Check that the catalog state matches this before committing and, if it doesn't match, throw away + // the compaction work we've done. + let catalog_files = components.partition_files_source.fetch(partition_id).await; + let saved_parquet_file_state = SavedParquetFileState::from(catalog_files.iter()); + let input_paths: Vec = branch.iter().map(ParquetFilePath::from).collect(); @@ -276,12 +282,13 @@ async fn try_compact_partition( let (created_files, upgraded_files) = update_catalog( Arc::clone(&components), partition_id, + saved_parquet_file_state, files_to_delete, upgrade, created_file_params, target_level, ) - .await; + .await?; // Extend created files, upgraded files and files_to_keep to files_next files_next.extend(created_files); @@ -415,11 +422,27 @@ async fn upload_files_to_object_store( async fn update_catalog( components: Arc, partition_id: PartitionId, + saved_parquet_file_state: SavedParquetFileState, files_to_delete: Vec, files_to_upgrade: Vec, file_params_to_create: Vec, target_level: CompactionLevel, -) -> (Vec, Vec) { +) -> Result<(Vec, Vec), DynError> { + let catalog_files = components.partition_files_source.fetch(partition_id).await; + let current_parquet_file_state = SavedParquetFileState::from(&catalog_files); + + if saved_parquet_file_state != current_parquet_file_state { + // Someone else has changed the files in the catalog since we started compacting; throw away our work and + // don't commit anything. + return Err(Box::new(SimpleError::new( + ErrorKind::ConcurrentModification, + format!( + "Parquet files for partition {partition_id} have been modified since compaction started. \ + Saved: {saved_parquet_file_state:?} != Current: {current_parquet_file_state:?}" + ), + ))); + } + let created_ids = components .commit .commit( @@ -447,5 +470,5 @@ async fn update_catalog( }) .collect::>(); - (created_file_params, upgraded_files) + Ok((created_file_params, upgraded_files)) } diff --git a/compactor2/src/error.rs b/compactor2/src/error.rs index 3ae7be1b2b..aed1982a35 100644 --- a/compactor2/src/error.rs +++ b/compactor2/src/error.rs @@ -23,6 +23,12 @@ pub enum ErrorKind { /// Partition took too long. Timeout, + /// Concurrent modification. + /// + /// This compactor instance expected to be the only process working on this partition's Parquet files, but the + /// Parquet files were modified while the compactor was doing work, so the work was thrown away and not committed. + ConcurrentModification, + /// Unknown/unexpected error. /// /// This will likely mark the affected partition as "skipped" and the compactor will no longer touch it. @@ -36,6 +42,7 @@ impl ErrorKind { Self::ObjectStore, Self::OutOfMemory, Self::Timeout, + Self::ConcurrentModification, Self::Unknown, ] } @@ -46,6 +53,7 @@ impl ErrorKind { Self::ObjectStore => "object_store", Self::OutOfMemory => "out_of_memory", Self::Timeout => "timeout", + Self::ConcurrentModification => "concurrent_modification", Self::Unknown => "unknown", } } diff --git a/compactor2/src/partition_info.rs b/compactor2/src/partition_info.rs index ada8b2da31..087061a136 100644 --- a/compactor2/src/partition_info.rs +++ b/compactor2/src/partition_info.rs @@ -2,7 +2,10 @@ use std::sync::Arc; -use data_types::{NamespaceId, PartitionId, PartitionKey, Table, TableSchema}; +use data_types::{ + CompactionLevel, NamespaceId, ParquetFile, ParquetFileId, PartitionId, PartitionKey, Table, + TableSchema, +}; use schema::sort::SortKey; /// Information about the Partition being compacted @@ -36,3 +39,108 @@ impl PartitionInfo { self.table_schema.column_count() } } + +/// Saved snapshot of a partition's Parquet files' IDs and compaction levels. Save this state at the beginning of a +/// compaction operation, then just before committing ask for the state again. If the two saved states are identical, +/// we assume no other compactor instance has compacted this partition and this compactor instance should commit its +/// work. If the two saved states differ, throw away the work and do not commit as the Parquet files have been changed +/// by some other process while this compactor instance was working. +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct SavedParquetFileState { + ids_and_levels: Vec<(ParquetFileId, CompactionLevel)>, +} + +impl<'a, T> From for SavedParquetFileState +where + T: IntoIterator, +{ + fn from(parquet_files: T) -> Self { + let mut ids_and_levels: Vec<_> = parquet_files + .into_iter() + .map(|pf| (pf.id, pf.compaction_level)) + .collect(); + + ids_and_levels.sort(); + + Self { ids_and_levels } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use iox_tests::ParquetFileBuilder; + + #[test] + fn saved_state_sorts_by_parquet_file_id() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id2_level_2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + let pf_id3_level_1 = ParquetFileBuilder::new(3) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + + let saved_state_1 = + SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2, &pf_id3_level_1]); + let saved_state_2 = + SavedParquetFileState::from([&pf_id3_level_1, &pf_id1_level_0, &pf_id2_level_2]); + + assert_eq!(saved_state_1, saved_state_2); + } + + #[test] + fn missing_files_not_equal() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id2_level_2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + let pf_id3_level_1 = ParquetFileBuilder::new(3) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + + let saved_state_1 = + SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2, &pf_id3_level_1]); + let saved_state_2 = SavedParquetFileState::from([&pf_id3_level_1, &pf_id1_level_0]); + + assert_ne!(saved_state_1, saved_state_2); + } + + #[test] + fn additional_files_not_equal() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id2_level_2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + let pf_id3_level_1 = ParquetFileBuilder::new(3) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + + let saved_state_1 = SavedParquetFileState::from([&pf_id3_level_1, &pf_id1_level_0]); + let saved_state_2 = + SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2, &pf_id3_level_1]); + + assert_ne!(saved_state_1, saved_state_2); + } + + #[test] + fn changed_compaction_level_not_equal() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id1_level_1 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + + let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0]); + let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_1]); + + assert_ne!(saved_state_1, saved_state_2); + } +}