revert: "revert: Merge pull request #7369 from influxdata/cn/parquet-file-saved-status"

This reverts commit 0d7393f2c1.
pull/24376/head
Carol (Nichols || Goulding) 2023-03-30 12:39:03 -04:00
parent 0d7393f2c1
commit 956d7bcee4
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
5 changed files with 203 additions and 7 deletions

View File

@ -56,7 +56,12 @@ impl Compactor2 {
tokio::select! {
_ = shutdown_captured.cancelled() => {}
_ = async {
compact(config.partition_concurrency, config.partition_timeout, Arc::clone(&job_semaphore), &components).await;
compact(
config.partition_concurrency,
config.partition_timeout,
Arc::clone(&job_semaphore),
&components
).await;
info!("compactor done");
} => {}

View File

@ -191,7 +191,10 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
.filter(|kind| {
// use explicit match statement so we never forget to add new variants
match kind {
ErrorKind::OutOfMemory | ErrorKind::Timeout | ErrorKind::Unknown => true,
ErrorKind::OutOfMemory
| ErrorKind::Timeout
| ErrorKind::ConcurrentModification
| ErrorKind::Unknown => true,
ErrorKind::ObjectStore => false,
}
})

View File

@ -15,7 +15,7 @@ use crate::{
},
error::{DynError, ErrorKind, SimpleError},
file_classification::{FileClassification, FilesForProgress, FilesToSplitOrCompact},
partition_info::PartitionInfo,
partition_info::{PartitionInfo, SavedParquetFileState},
PlanIR,
};
@ -219,6 +219,12 @@ async fn try_compact_partition(
let mut files_next = files_later;
// loop for each "Branch"
for branch in branches {
// Keep the current state as a check to make sure this is the only compactor modifying this partition's
// files. Check that the catalog state matches this before committing and, if it doesn't match, throw away
// the compaction work we've done.
let saved_parquet_file_state =
fetch_and_save_parquet_file_state(&components, partition_id).await;
let input_paths: Vec<ParquetFilePath> =
branch.iter().map(ParquetFilePath::from).collect();
@ -276,12 +282,13 @@ async fn try_compact_partition(
let (created_files, upgraded_files) = update_catalog(
Arc::clone(&components),
partition_id,
saved_parquet_file_state,
files_to_delete,
upgrade,
created_file_params,
target_level,
)
.await;
.await?;
// Extend created files, upgraded files and files_to_keep to files_next
files_next.extend(created_files);
@ -409,17 +416,41 @@ async fn upload_files_to_object_store(
.collect()
}
async fn fetch_and_save_parquet_file_state(
components: &Components,
partition_id: PartitionId,
) -> SavedParquetFileState {
let catalog_files = components.partition_files_source.fetch(partition_id).await;
SavedParquetFileState::from(&catalog_files)
}
/// Update the catalog to create, soft delete and upgrade corresponding given input
/// to provided target level
/// Return created and upgraded files
async fn update_catalog(
components: Arc<Components>,
partition_id: PartitionId,
saved_parquet_file_state: SavedParquetFileState,
files_to_delete: Vec<ParquetFile>,
files_to_upgrade: Vec<ParquetFile>,
file_params_to_create: Vec<ParquetFileParams>,
target_level: CompactionLevel,
) -> (Vec<ParquetFile>, Vec<ParquetFile>) {
) -> Result<(Vec<ParquetFile>, Vec<ParquetFile>), DynError> {
let current_parquet_file_state =
fetch_and_save_parquet_file_state(&components, partition_id).await;
if saved_parquet_file_state != current_parquet_file_state {
// Someone else has changed the files in the catalog since we started compacting; throw away our work and
// don't commit anything.
return Err(Box::new(SimpleError::new(
ErrorKind::ConcurrentModification,
format!(
"Parquet files for partition {partition_id} have been modified since compaction started. \
Saved: {saved_parquet_file_state:?} != Current: {current_parquet_file_state:?}"
),
)));
}
let created_ids = components
.commit
.commit(
@ -447,5 +478,5 @@ async fn update_catalog(
})
.collect::<Vec<_>>();
(created_file_params, upgraded_files)
Ok((created_file_params, upgraded_files))
}

View File

@ -23,6 +23,12 @@ pub enum ErrorKind {
/// Partition took too long.
Timeout,
/// Concurrent modification.
///
/// This compactor instance expected to be the only process working on this partition's Parquet files, but the
/// Parquet files were modified while the compactor was doing work, so the work was thrown away and not committed.
ConcurrentModification,
/// Unknown/unexpected error.
///
/// This will likely mark the affected partition as "skipped" and the compactor will no longer touch it.
@ -36,6 +42,7 @@ impl ErrorKind {
Self::ObjectStore,
Self::OutOfMemory,
Self::Timeout,
Self::ConcurrentModification,
Self::Unknown,
]
}
@ -46,6 +53,7 @@ impl ErrorKind {
Self::ObjectStore => "object_store",
Self::OutOfMemory => "out_of_memory",
Self::Timeout => "timeout",
Self::ConcurrentModification => "concurrent_modification",
Self::Unknown => "unknown",
}
}

View File

@ -2,7 +2,10 @@
use std::sync::Arc;
use data_types::{NamespaceId, PartitionId, PartitionKey, Table, TableSchema};
use data_types::{
CompactionLevel, NamespaceId, ParquetFile, ParquetFileId, PartitionId, PartitionKey, Table,
TableSchema,
};
use schema::sort::SortKey;
/// Information about the Partition being compacted
@ -36,3 +39,149 @@ impl PartitionInfo {
self.table_schema.column_count()
}
}
/// Saved snapshot of a partition's Parquet files' IDs and compaction levels. Save this state at the beginning of a
/// compaction operation, then just before committing ask for the state again. If the two saved states are identical,
/// we assume no other compactor instance has compacted this partition and this compactor instance should commit its
/// work. If the two saved states differ, throw away the work and do not commit as the Parquet files have been changed
/// by some other process while this compactor instance was working.
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct SavedParquetFileState {
ids_and_levels: Vec<(ParquetFileId, CompactionLevel)>,
}
impl<'a, T> From<T> for SavedParquetFileState
where
T: IntoIterator<Item = &'a ParquetFile>,
{
fn from(parquet_files: T) -> Self {
let mut ids_and_levels: Vec<_> = parquet_files
.into_iter()
.map(|pf| (pf.id, pf.compaction_level))
.collect();
ids_and_levels.sort();
Self { ids_and_levels }
}
}
#[cfg(test)]
mod tests {
use super::*;
use iox_tests::ParquetFileBuilder;
#[test]
fn saved_state_sorts_by_parquet_file_id() {
let pf_id1_level_0 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::Initial)
.build();
let pf_id2_level_2 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Final)
.build();
let pf_id3_level_1 = ParquetFileBuilder::new(3)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.build();
let saved_state_1 =
SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2, &pf_id3_level_1]);
let saved_state_2 =
SavedParquetFileState::from([&pf_id3_level_1, &pf_id1_level_0, &pf_id2_level_2]);
assert_eq!(saved_state_1, saved_state_2);
}
#[test]
fn both_empty_parquet_files() {
let saved_state_1 = SavedParquetFileState::from([]);
let saved_state_2 = SavedParquetFileState::from([]);
assert_eq!(saved_state_1, saved_state_2);
}
#[test]
fn one_empty_parquet_files() {
let pf_id1_level_0 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::Initial)
.build();
let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0]);
let saved_state_2 = SavedParquetFileState::from([]);
assert_ne!(saved_state_1, saved_state_2);
}
#[test]
fn missing_files_not_equal() {
let pf_id1_level_0 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::Initial)
.build();
let pf_id2_level_2 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Final)
.build();
let pf_id3_level_1 = ParquetFileBuilder::new(3)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.build();
let saved_state_1 =
SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2, &pf_id3_level_1]);
let saved_state_2 = SavedParquetFileState::from([&pf_id3_level_1, &pf_id1_level_0]);
assert_ne!(saved_state_1, saved_state_2);
}
#[test]
fn additional_files_not_equal() {
let pf_id1_level_0 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::Initial)
.build();
let pf_id2_level_2 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Final)
.build();
let pf_id3_level_1 = ParquetFileBuilder::new(3)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.build();
let saved_state_1 = SavedParquetFileState::from([&pf_id3_level_1, &pf_id1_level_0]);
let saved_state_2 =
SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2, &pf_id3_level_1]);
assert_ne!(saved_state_1, saved_state_2);
}
#[test]
fn changed_compaction_level_not_equal() {
let pf_id1_level_0 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::Initial)
.build();
let pf_id1_level_1 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.build();
let pf_id2_level_2 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Final)
.build();
let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id2_level_2]);
let saved_state_2 = SavedParquetFileState::from([&pf_id1_level_1, &pf_id2_level_2]);
assert_ne!(saved_state_1, saved_state_2);
}
#[test]
fn same_number_of_files_different_ids_not_equal() {
let pf_id1_level_0 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::Initial)
.build();
let pf_id2_level_0 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Initial)
.build();
let pf_id3_level_2 = ParquetFileBuilder::new(3)
.with_compaction_level(CompactionLevel::Final)
.build();
let saved_state_1 = SavedParquetFileState::from([&pf_id1_level_0, &pf_id3_level_2]);
let saved_state_2 = SavedParquetFileState::from([&pf_id2_level_0, &pf_id3_level_2]);
assert_ne!(saved_state_1, saved_state_2);
}
}