diff --git a/compactor/src/parquet_file_combining.rs b/compactor/src/parquet_file_combining.rs index 532a5cf3d5..7172c5a822 100644 --- a/compactor/src/parquet_file_combining.rs +++ b/compactor/src/parquet_file_combining.rs @@ -374,6 +374,7 @@ mod tests { .join("\n"); let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp) + .with_max_seq(20) // This should be irrelevant because this is a level 1 file .with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction let level_1_file = partition.create_parquet_file(builder).await; @@ -399,18 +400,30 @@ mod tests { .with_max_seq(2); let level_0_max_seq_2 = partition.create_parquet_file(builder).await; + let lp = vec![ + "table,tag1=VT field_int=88i 10000", // will be deduplicated with level_0_max_seq_1 + "table,tag1=OR field_int=99i 12000", + ] + .join("\n"); + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp) + .with_max_seq(5) // This should be irrelevant because this is a level 1 file + .with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction + let level_1_with_duplicates = partition.create_parquet_file(builder).await; + let lp = vec!["table,tag2=OH,tag3=21 field_int=21i 36000"].join("\n"); let builder = TestParquetFileBuilder::default() .with_line_protocol(&lp) .with_file_size_bytes(40 * 1024 * 1024); // Really large file let large_file = partition.create_parquet_file(builder).await; - // Order here is important! The Level 1 files are first, then Level 0 files in max seq num - // ascending order, as filter_parquet_files would create them + // Order here isn't relevant; the chunk order should ensure the level 1 files are ordered + // first, then the other files by max seq num. let parquet_files = vec![ - level_1_file.parquet_file, - level_0_max_seq_1.parquet_file, level_0_max_seq_2.parquet_file, + level_1_with_duplicates.parquet_file, + level_0_max_seq_1.parquet_file, + level_1_file.parquet_file, large_file.parquet_file, ]; @@ -480,7 +493,7 @@ mod tests { } = test_setup().await; compact_parquet_files( - parquet_files.into_iter().take(3).collect(), + parquet_files.into_iter().take(4).collect(), &candidate_partition, Arc::clone(&catalog.catalog), ParquetStorage::new(Arc::clone(&catalog.object_store)), @@ -501,13 +514,13 @@ mod tests { // 1 large file not included in compaction assert_eq!( (files[0].id.get(), files[0].compaction_level), - (4, CompactionLevel::Initial) + (5, CompactionLevel::Initial) ); // 1 newly created CompactionLevel::FileNonOverlapped file as the result of // compaction assert_eq!( (files[1].id.get(), files[1].compaction_level), - (5, CompactionLevel::FileNonOverlapped) + (6, CompactionLevel::FileNonOverlapped) ); // ------------------------------------------------ @@ -528,6 +541,7 @@ mod tests { "| 21 | | OH | 21 | 1970-01-01T00:00:00.000036Z |", "| 270 | UT | | | 1970-01-01T00:00:00.000025Z |", "| 70 | UT | | | 1970-01-01T00:00:00.000020Z |", + "| 99 | OR | | | 1970-01-01T00:00:00.000012Z |", "+-----------+------+------+------+-----------------------------+", ], &batches