test: Additional coverage for the compaction operation
parent
86c50b8033
commit
aa030ba132
|
@ -374,6 +374,7 @@ mod tests {
|
||||||
.join("\n");
|
.join("\n");
|
||||||
let builder = TestParquetFileBuilder::default()
|
let builder = TestParquetFileBuilder::default()
|
||||||
.with_line_protocol(&lp)
|
.with_line_protocol(&lp)
|
||||||
|
.with_max_seq(20) // This should be irrelevant because this is a level 1 file
|
||||||
.with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction
|
.with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction
|
||||||
let level_1_file = partition.create_parquet_file(builder).await;
|
let level_1_file = partition.create_parquet_file(builder).await;
|
||||||
|
|
||||||
|
@ -399,18 +400,30 @@ mod tests {
|
||||||
.with_max_seq(2);
|
.with_max_seq(2);
|
||||||
let level_0_max_seq_2 = partition.create_parquet_file(builder).await;
|
let level_0_max_seq_2 = partition.create_parquet_file(builder).await;
|
||||||
|
|
||||||
|
let lp = vec![
|
||||||
|
"table,tag1=VT field_int=88i 10000", // will be deduplicated with level_0_max_seq_1
|
||||||
|
"table,tag1=OR field_int=99i 12000",
|
||||||
|
]
|
||||||
|
.join("\n");
|
||||||
|
let builder = TestParquetFileBuilder::default()
|
||||||
|
.with_line_protocol(&lp)
|
||||||
|
.with_max_seq(5) // This should be irrelevant because this is a level 1 file
|
||||||
|
.with_compaction_level(CompactionLevel::FileNonOverlapped); // Prev compaction
|
||||||
|
let level_1_with_duplicates = partition.create_parquet_file(builder).await;
|
||||||
|
|
||||||
let lp = vec!["table,tag2=OH,tag3=21 field_int=21i 36000"].join("\n");
|
let lp = vec!["table,tag2=OH,tag3=21 field_int=21i 36000"].join("\n");
|
||||||
let builder = TestParquetFileBuilder::default()
|
let builder = TestParquetFileBuilder::default()
|
||||||
.with_line_protocol(&lp)
|
.with_line_protocol(&lp)
|
||||||
.with_file_size_bytes(40 * 1024 * 1024); // Really large file
|
.with_file_size_bytes(40 * 1024 * 1024); // Really large file
|
||||||
let large_file = partition.create_parquet_file(builder).await;
|
let large_file = partition.create_parquet_file(builder).await;
|
||||||
|
|
||||||
// Order here is important! The Level 1 files are first, then Level 0 files in max seq num
|
// Order here isn't relevant; the chunk order should ensure the level 1 files are ordered
|
||||||
// ascending order, as filter_parquet_files would create them
|
// first, then the other files by max seq num.
|
||||||
let parquet_files = vec![
|
let parquet_files = vec![
|
||||||
level_1_file.parquet_file,
|
|
||||||
level_0_max_seq_1.parquet_file,
|
|
||||||
level_0_max_seq_2.parquet_file,
|
level_0_max_seq_2.parquet_file,
|
||||||
|
level_1_with_duplicates.parquet_file,
|
||||||
|
level_0_max_seq_1.parquet_file,
|
||||||
|
level_1_file.parquet_file,
|
||||||
large_file.parquet_file,
|
large_file.parquet_file,
|
||||||
];
|
];
|
||||||
|
|
||||||
|
@ -480,7 +493,7 @@ mod tests {
|
||||||
} = test_setup().await;
|
} = test_setup().await;
|
||||||
|
|
||||||
compact_parquet_files(
|
compact_parquet_files(
|
||||||
parquet_files.into_iter().take(3).collect(),
|
parquet_files.into_iter().take(4).collect(),
|
||||||
&candidate_partition,
|
&candidate_partition,
|
||||||
Arc::clone(&catalog.catalog),
|
Arc::clone(&catalog.catalog),
|
||||||
ParquetStorage::new(Arc::clone(&catalog.object_store)),
|
ParquetStorage::new(Arc::clone(&catalog.object_store)),
|
||||||
|
@ -501,13 +514,13 @@ mod tests {
|
||||||
// 1 large file not included in compaction
|
// 1 large file not included in compaction
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
(files[0].id.get(), files[0].compaction_level),
|
(files[0].id.get(), files[0].compaction_level),
|
||||||
(4, CompactionLevel::Initial)
|
(5, CompactionLevel::Initial)
|
||||||
);
|
);
|
||||||
// 1 newly created CompactionLevel::FileNonOverlapped file as the result of
|
// 1 newly created CompactionLevel::FileNonOverlapped file as the result of
|
||||||
// compaction
|
// compaction
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
(files[1].id.get(), files[1].compaction_level),
|
(files[1].id.get(), files[1].compaction_level),
|
||||||
(5, CompactionLevel::FileNonOverlapped)
|
(6, CompactionLevel::FileNonOverlapped)
|
||||||
);
|
);
|
||||||
|
|
||||||
// ------------------------------------------------
|
// ------------------------------------------------
|
||||||
|
@ -528,6 +541,7 @@ mod tests {
|
||||||
"| 21 | | OH | 21 | 1970-01-01T00:00:00.000036Z |",
|
"| 21 | | OH | 21 | 1970-01-01T00:00:00.000036Z |",
|
||||||
"| 270 | UT | | | 1970-01-01T00:00:00.000025Z |",
|
"| 270 | UT | | | 1970-01-01T00:00:00.000025Z |",
|
||||||
"| 70 | UT | | | 1970-01-01T00:00:00.000020Z |",
|
"| 70 | UT | | | 1970-01-01T00:00:00.000020Z |",
|
||||||
|
"| 99 | OR | | | 1970-01-01T00:00:00.000012Z |",
|
||||||
"+-----------+------+------+------+-----------------------------+",
|
"+-----------+------+------+------+-----------------------------+",
|
||||||
],
|
],
|
||||||
&batches
|
&batches
|
||||||
|
|
Loading…
Reference in New Issue