From 8b360e013a97cabe2ae162095db678ade3cc252d Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 29 Mar 2023 13:15:28 -0400 Subject: [PATCH 1/4] fix: Split long line in a macro that rustfmt doesn't handle --- compactor2/src/compactor.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/compactor2/src/compactor.rs b/compactor2/src/compactor.rs index 87a3642676..b57635e109 100644 --- a/compactor2/src/compactor.rs +++ b/compactor2/src/compactor.rs @@ -56,7 +56,12 @@ impl Compactor2 { tokio::select! { _ = shutdown_captured.cancelled() => {} _ = async { - compact(config.partition_concurrency, config.partition_timeout, Arc::clone(&job_semaphore), &components).await; + compact( + config.partition_concurrency, + config.partition_timeout, + Arc::clone(&job_semaphore), + &components + ).await; info!("compactor done"); } => {} From 03e5b1ac6aa855cbfa76f92ae96bf93c60868e4b Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 29 Mar 2023 13:46:20 -0400 Subject: [PATCH 2/4] feat: Save Parquet file state and check it remains the same before committing compaction --- compactor2/src/components/hardcoded.rs | 5 +- compactor2/src/driver.rs | 31 ++++++- compactor2/src/error.rs | 8 ++ compactor2/src/partition_info.rs | 110 ++++++++++++++++++++++++- 4 files changed, 148 insertions(+), 6 deletions(-) diff --git a/compactor2/src/components/hardcoded.rs b/compactor2/src/components/hardcoded.rs index 36cc60e620..2d1020780e 100644 --- a/compactor2/src/components/hardcoded.rs +++ b/compactor2/src/components/hardcoded.rs @@ -191,7 +191,10 @@ pub fn hardcoded_components(config: &Config) -> Arc { .filter(|kind| { // use explicit match statement so we never forget to add new variants match kind { - ErrorKind::OutOfMemory | ErrorKind::Timeout | ErrorKind::Unknown => true, + ErrorKind::OutOfMemory + | ErrorKind::Timeout + | ErrorKind::ConcurrentModification + | ErrorKind::Unknown => true, ErrorKind::ObjectStore => false, } }) diff --git a/compactor2/src/driver.rs b/compactor2/src/driver.rs index 831f0b6324..cf264cbb86 100644 --- a/compactor2/src/driver.rs +++ b/compactor2/src/driver.rs @@ -15,7 +15,7 @@ use crate::{ }, error::{DynError, ErrorKind, SimpleError}, file_classification::{FileClassification, FilesForProgress, FilesToSplitOrCompact}, - partition_info::PartitionInfo, + partition_info::{PartitionInfo, SavedParquetFileState}, PlanIR, }; @@ -219,6 +219,12 @@ async fn try_compact_partition( let mut files_next = files_later; // loop for each "Branch" for branch in branches { + // Keep the current state as a check to make sure this is the only compactor modifying this partition's + // files. Check that the catalog state matches this before committing and, if it doesn't match, throw away + // the compaction work we've done. + let catalog_files = components.partition_files_source.fetch(partition_id).await; + let saved_parquet_file_state = SavedParquetFileState::from(catalog_files.iter()); + let input_paths: Vec = branch.iter().map(ParquetFilePath::from).collect(); @@ -276,12 +282,13 @@ async fn try_compact_partition( let (created_files, upgraded_files) = update_catalog( Arc::clone(&components), partition_id, + saved_parquet_file_state, files_to_delete, upgrade, created_file_params, target_level, ) - .await; + .await?; // Extend created files, upgraded files and files_to_keep to files_next files_next.extend(created_files); @@ -415,11 +422,27 @@ async fn upload_files_to_object_store( async fn update_catalog( components: Arc, partition_id: PartitionId, + saved_parquet_file_state: SavedParquetFileState, files_to_delete: Vec, files_to_upgrade: Vec, file_params_to_create: Vec, target_level: CompactionLevel, -) -> (Vec, Vec) { +) -> Result<(Vec, Vec), DynError> { + let catalog_files = components.partition_files_source.fetch(partition_id).await; + let current_parquet_file_state = SavedParquetFileState::from(&catalog_files); + + if saved_parquet_file_state != current_parquet_file_state { + // Someone else has changed the files in the catalog since we started compacting; throw away our work and + // don't commit anything. + return Err(Box::new(SimpleError::new( + ErrorKind::ConcurrentModification, + format!( + "Parquet files for partition {partition_id} have been modified since compaction started. \ + Saved: {saved_parquet_file_state:?} != Current: {current_parquet_file_state:?}" + ), + ))); + } + let created_ids = components .commit .commit( @@ -447,5 +470,5 @@ async fn update_catalog( }) .collect::>(); - (created_file_params, upgraded_files) + Ok((created_file_params, upgraded_files)) } diff --git a/compactor2/src/error.rs b/compactor2/src/error.rs index 3ae7be1b2b..aed1982a35 100644 --- a/compactor2/src/error.rs +++ b/compactor2/src/error.rs @@ -23,6 +23,12 @@ pub enum ErrorKind { /// Partition took too long. Timeout, + /// Concurrent modification. + /// + /// This compactor instance expected to be the only process working on this partition's Parquet files, but the + /// Parquet files were modified while the compactor was doing work, so the work was thrown away and not committed. + ConcurrentModification, + /// Unknown/unexpected error. /// /// This will likely mark the affected partition as "skipped" and the compactor will no longer touch it. @@ -36,6 +42,7 @@ impl ErrorKind { Self::ObjectStore, Self::OutOfMemory, Self::Timeout, + Self::ConcurrentModification, Self::Unknown, ] } @@ -46,6 +53,7 @@ impl ErrorKind { Self::ObjectStore => "object_store", Self::OutOfMemory => "out_of_memory", Self::Timeout => "timeout", + Self::ConcurrentModification => "concurrent_modification", Self::Unknown => "unknown", } } diff --git a/compactor2/src/partition_info.rs b/compactor2/src/partition_info.rs index ada8b2da31..087061a136 100644 --- a/compactor2/src/partition_info.rs +++ b/compactor2/src/partition_info.rs @@ -2,7 +2,10 @@ use std::sync::Arc; -use data_types::{NamespaceId, PartitionId, PartitionKey, Table, TableSchema}; +use data_types::{ + CompactionLevel, NamespaceId, ParquetFile, ParquetFileId, PartitionId, PartitionKey, Table, + TableSchema, +}; use schema::sort::SortKey; /// Information about the Partition being compacted @@ -36,3 +39,108 @@ impl PartitionInfo { self.table_schema.column_count() } } + +/// Saved snapshot of a partition's Parquet files' IDs and compaction levels. Save this state at the beginning of a +/// compaction operation, then just before committing ask for the state again. If the two saved states are identical, +/// we assume no other compactor instance has compacted this partition and this compactor instance should commit its +/// work. If the two saved states differ, throw away the work and do not commit as the Parquet files have been changed +/// by some other process while this compactor instance was working. +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct SavedParquetFileState { + ids_and_levels: Vec<(ParquetFileId, CompactionLevel)>, +} + +impl<'a, T> From for SavedParquetFileState +where + T: IntoIterator, +{ + fn from(parquet_files: T) -> Self { + let mut ids_and_levels: Vec<_> = parquet_files + .into_iter() + .map(|pf| (pf.id, pf.compaction_level)) + .collect(); + + ids_and_levels.sort(); + + Self { ids_and_levels } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use iox_tests::ParquetFileBuilder; + + #[test] + fn saved_state_sorts_by_parquet_file_id() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id2_level_2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + let pf_id3_level_1 = ParquetFileBuilder::new(3) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + + let saved_state_1 = + SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2, &pf_id3_level_1]); + let saved_state_2 = + SavedParquetFileState::from([&pf_id3_level_1, &pf_id1_level_0, &pf_id2_level_2]); + + assert_eq!(saved_state_1, saved_state_2); + } + + #[test] + fn missing_files_not_equal() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id2_level_2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + let pf_id3_level_1 = ParquetFileBuilder::new(3) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + + let saved_state_1 = + SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2, &pf_id3_level_1]); + let saved_state_2 = SavedParquetFileState::from([&pf_id3_level_1, &pf_id1_level_0]); + + assert_ne!(saved_state_1, saved_state_2); + } + + #[test] + fn additional_files_not_equal() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id2_level_2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + let pf_id3_level_1 = ParquetFileBuilder::new(3) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + + let saved_state_1 = SavedParquetFileState::from([&pf_id3_level_1, &pf_id1_level_0]); + let saved_state_2 = + SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2, &pf_id3_level_1]); + + assert_ne!(saved_state_1, saved_state_2); + } + + #[test] + fn changed_compaction_level_not_equal() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id1_level_1 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + + let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0]); + let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_1]); + + assert_ne!(saved_state_1, saved_state_2); + } +} From a0890bf8d33a989378d98477b1bcb5317221ff74 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 30 Mar 2023 10:58:27 -0400 Subject: [PATCH 3/4] refactor: Extract a function for repeated code to get and save parquet file state --- compactor2/src/driver.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/compactor2/src/driver.rs b/compactor2/src/driver.rs index cf264cbb86..8ea00f9242 100644 --- a/compactor2/src/driver.rs +++ b/compactor2/src/driver.rs @@ -222,8 +222,8 @@ async fn try_compact_partition( // Keep the current state as a check to make sure this is the only compactor modifying this partition's // files. Check that the catalog state matches this before committing and, if it doesn't match, throw away // the compaction work we've done. - let catalog_files = components.partition_files_source.fetch(partition_id).await; - let saved_parquet_file_state = SavedParquetFileState::from(catalog_files.iter()); + let saved_parquet_file_state = + fetch_and_save_parquet_file_state(&components, partition_id).await; let input_paths: Vec = branch.iter().map(ParquetFilePath::from).collect(); @@ -416,6 +416,14 @@ async fn upload_files_to_object_store( .collect() } +async fn fetch_and_save_parquet_file_state( + components: &Components, + partition_id: PartitionId, +) -> SavedParquetFileState { + let catalog_files = components.partition_files_source.fetch(partition_id).await; + SavedParquetFileState::from(&catalog_files) +} + /// Update the catalog to create, soft delete and upgrade corresponding given input /// to provided target level /// Return created and upgraded files @@ -428,8 +436,8 @@ async fn update_catalog( file_params_to_create: Vec, target_level: CompactionLevel, ) -> Result<(Vec, Vec), DynError> { - let catalog_files = components.partition_files_source.fetch(partition_id).await; - let current_parquet_file_state = SavedParquetFileState::from(&catalog_files); + let current_parquet_file_state = + fetch_and_save_parquet_file_state(&components, partition_id).await; if saved_parquet_file_state != current_parquet_file_state { // Someone else has changed the files in the catalog since we started compacting; throw away our work and From 68abc42cda547fb3856a2a602c4dd4d1d74b9a0d Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 30 Mar 2023 11:04:51 -0400 Subject: [PATCH 4/4] test: Add more unit cases for SavedParquetFileState comparisons --- compactor2/src/partition_info.rs | 45 ++++++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/compactor2/src/partition_info.rs b/compactor2/src/partition_info.rs index 087061a136..54835d0fe2 100644 --- a/compactor2/src/partition_info.rs +++ b/compactor2/src/partition_info.rs @@ -91,6 +91,26 @@ mod tests { assert_eq!(saved_state_1, saved_state_2); } + #[test] + fn both_empty_parquet_files() { + let saved_state_1 = SavedParquetFileState::from([]); + let saved_state_2 = SavedParquetFileState::from([]); + + assert_eq!(saved_state_1, saved_state_2); + } + + #[test] + fn one_empty_parquet_files() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + + let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0]); + let saved_state_2 = SavedParquetFileState::from([]); + + assert_ne!(saved_state_1, saved_state_2); + } + #[test] fn missing_files_not_equal() { let pf_id1_level_0 = ParquetFileBuilder::new(1) @@ -137,9 +157,30 @@ mod tests { let pf_id1_level_1 = ParquetFileBuilder::new(1) .with_compaction_level(CompactionLevel::FileNonOverlapped) .build(); + let pf_id2_level_2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); - let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0]); - let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_1]); + let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2]); + let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_1, &pf_id2_level_2]); + + assert_ne!(saved_state_1, saved_state_2); + } + + #[test] + fn same_number_of_files_different_ids_not_equal() { + let pf_id1_level_0 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id2_level_0 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let pf_id3_level_2 = ParquetFileBuilder::new(3) + .with_compaction_level(CompactionLevel::Final) + .build(); + + let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id3_level_2]); + let saved_state_2 = SavedParquetFileState::from([&pf_id2_level_0, &pf_id3_level_2]); assert_ne!(saved_state_1, saved_state_2); }