fix: Compact one level 0 file to avoid getting stuck on it
If compaction is called on one level 0 file, the work of compaction doesn't really need to be done, but for simplicity's sake for now, run it through the compaction query/write process rather than special casing it. This will keep us from getting stuck in the unlikely event that a partition with one level 0 file gets selected as a top candidate for compaction.pull/24376/head
parent
fbf672015e
commit
d9ca28e83a
|
@ -95,7 +95,7 @@ pub(crate) async fn compact_parquet_files(
|
|||
|
||||
let num_files = files.len();
|
||||
ensure!(
|
||||
num_files >= 2,
|
||||
num_files > 0,
|
||||
NotEnoughParquetFilesSnafu {
|
||||
num_files,
|
||||
partition_id
|
||||
|
@ -459,7 +459,7 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn one_input_file_is_an_error() {
|
||||
async fn one_input_file_gets_compacted() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
let TestSetup {
|
||||
|
@ -469,16 +469,41 @@ mod tests {
|
|||
..
|
||||
} = test_setup().await;
|
||||
|
||||
let result = compact_parquet_files(
|
||||
vec![parquet_files.pop().unwrap()],
|
||||
let parquet_file = parquet_files.remove(0);
|
||||
compact_parquet_files(
|
||||
vec![parquet_file],
|
||||
&candidate_partition,
|
||||
Arc::clone(&catalog.catalog),
|
||||
ParquetStorage::new(Arc::clone(&catalog.object_store)),
|
||||
Arc::clone(&catalog.exec),
|
||||
Arc::clone(&catalog.time_provider) as Arc<dyn TimeProvider>,
|
||||
)
|
||||
.await;
|
||||
assert_error!(result, Error::NotEnoughParquetFiles { num_files: 1, .. });
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Should have 5 non-soft-deleted files:
|
||||
//
|
||||
// - 2 initial level 0 files not compacted
|
||||
// - 2 initial level 1 files not compacted
|
||||
// - the 1 initial level 0 file that was "compacted" into 1 level 1 file
|
||||
let files = catalog
|
||||
.list_by_table_not_to_delete(candidate_partition.table.id)
|
||||
.await;
|
||||
assert_eq!(files.len(), 5);
|
||||
let files_and_levels: Vec<_> = files
|
||||
.into_iter()
|
||||
.map(|f| (f.id.get(), f.compaction_level))
|
||||
.collect();
|
||||
assert_eq!(
|
||||
files_and_levels,
|
||||
vec![
|
||||
(1, CompactionLevel::FileNonOverlapped),
|
||||
(2, CompactionLevel::Initial),
|
||||
(4, CompactionLevel::FileNonOverlapped),
|
||||
(5, CompactionLevel::Initial),
|
||||
(6, CompactionLevel::FileNonOverlapped),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
Loading…
Reference in New Issue